Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3B15B200BC1 for ; Tue, 1 Nov 2016 09:40:33 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 39AEE160B07; Tue, 1 Nov 2016 08:40:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C9270160B14 for ; Tue, 1 Nov 2016 09:40:30 +0100 (CET) Received: (qmail 10719 invoked by uid 500); 1 Nov 2016 08:40:29 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 9137 invoked by uid 99); 1 Nov 2016 08:40:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 Nov 2016 08:40:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 867F7EAD9A; Tue, 1 Nov 2016 08:40:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Tue, 01 Nov 2016 08:40:52 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [25/50] [abbrv] flink git commit: [FLINK-4360] [tm] Implement TM -> JM registration logic archived-at: Tue, 01 Nov 2016 08:40:33 -0000 [FLINK-4360] [tm] Implement TM -> JM registration logic Upon requesting a slot for a new job, the TaskManager registers this job at the JobLeaderService. The job leader service is responsible to monitor job leader changes for all registered jobs. In case of a new job leader, the service will try to establish a connection to the new job leader. Upon establishing the connection the task manager is informed about it. The task manager will then offer all allocated but not yet active slots to the new job leader. Implement JobLeaderService The JobLeaderService is responsible for establishing a connection to the JM leader of a given job. Disable TaskExecutorTest#testRejectAllocationRequestsForOutOfSyncSlots Add simple task submission test Add job leader detection test case Add task slot acceptance test Fix RpcCompletenessTest Add comments This closes #2640. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b234521 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b234521 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b234521 Branch: refs/heads/flip-6 Commit: 2b234521388630274f63cdd7e52fbbe4cd7ee28b Parents: a122bfd Author: Till Rohrmann Authored: Wed Oct 5 17:02:06 2016 +0200 Committer: Till Rohrmann Committed: Tue Nov 1 09:39:31 2016 +0100 ---------------------------------------------------------------------- .../org/apache/flink/util/ReflectionUtil.java | 110 ++++ .../deployment/TaskDeploymentDescriptor.java | 11 +- .../runtime/executiongraph/ExecutionVertex.java | 2 + .../HighAvailabilityServices.java | 3 +- .../runtime/highavailability/NonHaServices.java | 4 +- .../highavailability/ZookeeperHaServices.java | 2 +- .../jobmaster/JMTMRegistrationSuccess.java | 45 ++ .../flink/runtime/jobmaster/JobMaster.java | 19 + .../runtime/jobmaster/JobMasterGateway.java | 36 ++ .../registration/RegisteredRpcConnection.java | 2 +- .../resourcemanager/ResourceManager.java | 2 +- .../slotmanager/SlotManager.java | 8 +- .../runtime/taskexecutor/JobLeaderListener.java | 60 +++ .../runtime/taskexecutor/JobLeaderService.java | 390 ++++++++++++++ .../taskexecutor/JobManagerConnection.java | 23 +- .../runtime/taskexecutor/JobManagerTable.java | 59 +++ .../runtime/taskexecutor/TaskExecutor.java | 522 ++++++++++++++----- .../taskexecutor/TaskExecutorGateway.java | 25 +- ...TaskExecutorToResourceManagerConnection.java | 5 + .../runtime/taskexecutor/TaskManagerRunner.java | 2 + .../taskexecutor/TaskManagerServices.java | 24 +- .../exceptions/SlotAllocationException.java | 39 ++ .../taskexecutor/slot/TaskSlotTable.java | 39 +- .../apache/flink/runtime/taskmanager/Task.java | 2 +- .../TaskDeploymentDescriptorTest.java | 6 +- .../TestingHighAvailabilityServices.java | 2 +- .../metrics/groups/TaskManagerGroupTest.java | 10 +- .../slotmanager/SlotManagerTest.java | 2 +- .../slotmanager/SlotProtocolTest.java | 8 +- .../flink/runtime/rpc/RpcCompletenessTest.java | 29 +- .../runtime/taskexecutor/TaskExecutorTest.java | 431 ++++++++++++++- .../runtime/taskmanager/TaskAsyncCallTest.java | 3 +- .../runtime/taskmanager/TaskManagerTest.java | 75 +-- .../flink/runtime/taskmanager/TaskStopTest.java | 2 +- .../flink/runtime/taskmanager/TaskTest.java | 21 +- .../tasks/InterruptSensitiveRestoreTest.java | 32 +- .../streaming/runtime/tasks/StreamTaskTest.java | 25 +- 37 files changed, 1801 insertions(+), 279 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2b234521/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java index b851eba..2883570 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java @@ -23,6 +23,9 @@ import org.apache.flink.annotation.Internal; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; @Internal public final class ReflectionUtil { @@ -151,6 +154,113 @@ public final class ReflectionUtil { } /** + * Extract the full template type information from the given type's template parameter at the + * given position. + * + * @param type type to extract the full template parameter information from + * @param templatePosition describing at which position the template type parameter is + * @return Full type information describing the template parameter's type + */ + public static FullTypeInfo getFullTemplateType(Type type, int templatePosition) { + if (type instanceof ParameterizedType) { + return getFullTemplateType(((ParameterizedType) type).getActualTypeArguments()[templatePosition]); + } else { + throw new IllegalArgumentException(); + } + } + + /** + * Extract the full type information from the given type. + * + * @param type to be analyzed + * @return Full type information describing the given type + */ + public static FullTypeInfo getFullTemplateType(Type type) { + if (type instanceof ParameterizedType) { + ParameterizedType parameterizedType = (ParameterizedType) type; + + FullTypeInfo[] templateTypeInfos = new FullTypeInfo[parameterizedType.getActualTypeArguments().length]; + + for (int i = 0; i < parameterizedType.getActualTypeArguments().length; i++) { + templateTypeInfos[i] = getFullTemplateType(parameterizedType.getActualTypeArguments()[i]); + } + + return new FullTypeInfo((Class)parameterizedType.getRawType(), templateTypeInfos); + } else { + return new FullTypeInfo((Class) type, null); + } + } + + /** + * Container for the full type information of a type. This means that it contains the + * {@link Class} object and for each template parameter it contains a full type information + * describing the type. + */ + public static class FullTypeInfo { + private final Class clazz; + private final FullTypeInfo[] templateTypeInfos; + + + public FullTypeInfo(Class clazz, FullTypeInfo[] templateTypeInfos) { + this.clazz = Preconditions.checkNotNull(clazz); + this.templateTypeInfos = templateTypeInfos; + } + + public Class getClazz() { + return clazz; + } + + public FullTypeInfo[] getTemplateTypeInfos() { + return templateTypeInfos; + } + + public Iterator> getClazzIterator() { + UnionIterator> unionIterator = new UnionIterator<>(); + + unionIterator.add(Collections.>singleton(clazz).iterator()); + + if (templateTypeInfos != null) { + for (int i = 0; i < templateTypeInfos.length; i++) { + unionIterator.add(templateTypeInfos[i].getClazzIterator()); + } + } + + return unionIterator; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + + builder.append(clazz.getSimpleName()); + + if (templateTypeInfos != null) { + builder.append("<"); + + for (int i = 0; i < templateTypeInfos.length - 1; i++) { + builder.append(templateTypeInfos[i]).append(", "); + } + + builder.append(templateTypeInfos[templateTypeInfos.length - 1]); + builder.append(">"); + } + + return builder.toString(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof FullTypeInfo) { + FullTypeInfo other = (FullTypeInfo) obj; + + return clazz == other.getClazz() && Arrays.equals(templateTypeInfos, other.getTemplateTypeInfos()); + } else { + return false; + } + } + } + + /** * Private constructor to prevent instantiation. */ private ReflectionUtil() { http://git-wip-us.apache.org/repos/asf/flink/blob/2b234521/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index 857628f..1093a5a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -55,7 +55,7 @@ public final class TaskDeploymentDescriptor implements Serializable { private final ExecutionAttemptID executionId; /** The allocation ID of the slot in which the task shall be run */ - private final AllocationID allocationID; + private final AllocationID allocationId; /** The task's name. */ private final String taskName; @@ -105,6 +105,7 @@ public final class TaskDeploymentDescriptor implements Serializable { */ public TaskDeploymentDescriptor( JobID jobID, + AllocationID allocationId, String jobName, JobVertexID vertexID, ExecutionAttemptID executionId, @@ -130,6 +131,7 @@ public final class TaskDeploymentDescriptor implements Serializable { checkArgument(attemptNumber >= 0); this.jobID = checkNotNull(jobID); + this.allocationId = checkNotNull(allocationId); this.jobName = checkNotNull(jobName); this.vertexID = checkNotNull(vertexID); this.executionId = checkNotNull(executionId); @@ -148,11 +150,11 @@ public final class TaskDeploymentDescriptor implements Serializable { this.requiredClasspaths = checkNotNull(requiredClasspaths); this.targetSlotNumber = targetSlotNumber; this.taskStateHandles = taskStateHandles; - this.allocationID = new AllocationID(); } public TaskDeploymentDescriptor( JobID jobID, + AllocationID allocationId, String jobName, JobVertexID vertexID, ExecutionAttemptID executionId, @@ -173,6 +175,7 @@ public final class TaskDeploymentDescriptor implements Serializable { this( jobID, + allocationId, jobName, vertexID, executionId, @@ -311,8 +314,8 @@ public final class TaskDeploymentDescriptor implements Serializable { return requiredClasspaths; } - public AllocationID getAllocationID() { - return allocationID; + public AllocationID getAllocationId() { + return allocationId; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/2b234521/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 8979d7c..eea2e81 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor; @@ -605,6 +606,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable blobPort, "The blob port has to be 0 < blobPort < 65536."); + + this.resourceID = Preconditions.checkNotNull(resourceID); + this.blobPort = blobPort; + } + + public ResourceID getResourceID() { + return resourceID; + } + + public int getBlobPort() { + return blobPort; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2b234521/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index e11f3a1..a7be476 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.client.SerializedJobExecutionResult; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.Future; @@ -646,6 +647,24 @@ public class JobMaster extends RpcEndpoint { executionGraph.getRequiredClasspaths()); } + @RpcMethod + public Iterable offerSlots(final Iterable slots, UUID leaderId) { + throw new UnsupportedOperationException("Has to be implemented."); + } + + @RpcMethod + public void failSlot(final AllocationID allocationId, UUID leaderId, Exception cause) { + throw new UnsupportedOperationException("Has to be implemented."); + } + + @RpcMethod + public RegistrationResponse registerTaskManager( + final String taskManagerAddress, + final ResourceID taskManagerProcessId, + final UUID leaderId) { + throw new UnsupportedOperationException("Has to be implemented."); + } + //---------------------------------------------------------------------------------------------- // Internal methods //---------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2b234521/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index b27b41c..0f155a4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -35,6 +36,7 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.query.KvStateID; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateServerAddress; +import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KvState; @@ -170,4 +172,38 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway { * Request the classloading props of this job. */ Future requestClassloadingProps(); + + /** + * Offer the given slots to the job manager. The response contains the set of accepted slots. + * + * @param slots to offer to the job manager + * @param leaderId identifying the job leader + * @param timeout for the rpc call + * @return Future set of accepted slots. + */ + Future> offerSlots(final Iterable slots, UUID leaderId, @RpcTimeout final Time timeout); + + /** + * Fail the slot with the given allocation id and cause. + * + * @param allocationId identifying the slot to fail + * @param leaderId identifying the job leader + * @param cause of the failing + */ + void failSlot(final AllocationID allocationId, UUID leaderId, Exception cause); + + /** + * Register the task manager at the job manager. + * + * @param taskManagerAddress address of the task manager + * @param taskManagerProcessId identifying the task manager + * @param leaderId identifying the job leader + * @param timeout for the rpc call + * @return Future registration response indicating whether the registration was successful or not + */ + Future registerTaskManager( + final String taskManagerAddress, + final ResourceID taskManagerProcessId, + final UUID leaderId, + @RpcTimeout final Time timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/2b234521/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java index 76093b0..78d4dbc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java @@ -35,7 +35,7 @@ import static org.apache.flink.util.Preconditions.checkState; /** * This utility class implements the basis of RPC connecting from one component to another component, * for example the RPC connection from TaskExecutor to ResourceManager. - * This {@code RegisteredRpcConnection} implements registration and get target gateway . + * This {@code RegisteredRpcConnection} implements registration and get target gateway. * *

The registration gives access to a future that is completed upon successful registration. * The RPC connection can be closed, for example when the target where it tries to register http://git-wip-us.apache.org/repos/asf/flink/blob/2b234521/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 3122804..6f6d525 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -188,7 +188,7 @@ public abstract class ResourceManager } else { try { LeaderRetrievalService jobMasterLeaderRetriever = - highAvailabilityServices.getJobManagerLeaderRetriever(jobID); + highAvailabilityServices.getJobManagerLeaderRetriever(jobID, jobMasterAddress); jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever); } catch (Exception e) { log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); http://git-wip-us.apache.org/repos/asf/flink/blob/2b234521/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index e312ea2..f055971 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -298,7 +298,13 @@ public abstract class SlotManager { final TaskExecutorRegistration registration = freeSlot.getTaskExecutorRegistration(); final Future slotRequestReplyFuture = registration.getTaskExecutorGateway() - .requestSlot(freeSlot.getSlotId(), allocationID, rmServices.getLeaderID(), timeout); + .requestSlot( + freeSlot.getSlotId(), + slotRequest.getJobId(), + allocationID, + "foobar", // TODO: set proper JM address + rmServices.getLeaderID(), + timeout); slotRequestReplyFuture.handleAsync(new BiFunction() { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/2b234521/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderListener.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderListener.java new file mode 100644 index 0000000..f02a8c2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderListener.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; + +import java.util.UUID; + +/** + * Listener for the {@link JobLeaderService}. The listener is notified whenever a job manager + * gained leadership for a registered job and the service could establish a connection to it. + * Furthermore, the listener is notified when a job manager loses leadership for a job. In case + * of an error, the {@link #handleError(Throwable)}} is called. + */ +public interface JobLeaderListener { + + /** + * Callback if a job manager has gained leadership for the job identified by the job id and a + * connection could be established to this job manager. + * + * @param jobId identifying the job for which the job manager has gained leadership + * @param jobManagerGateway to the job leader + * @param jobLeaderId new leader id of the job leader + * @param registrationMessage containing further registration information + */ + void jobManagerGainedLeadership(JobID jobId, JobMasterGateway jobManagerGateway, UUID jobLeaderId, JMTMRegistrationSuccess registrationMessage); + + /** + * Callback if the job leader for the job with the given job id lost its leadership. + * + * @param jobId identifying the job whose leader has lost leadership + * @param jobLeaderId old leader id + */ + void jobManagerLostLeadership(JobID jobId, UUID jobLeaderId); + + /** + * Callback for errors which might occur in the {@link JobLeaderService}. + * + * @param throwable cause + */ + void handleError(Throwable throwable); +} http://git-wip-us.apache.org/repos/asf/flink/blob/2b234521/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java new file mode 100644 index 0000000..9e71349 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.registration.RegisteredRpcConnection; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.registration.RetryingRegistration; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executor; + +/** + * This service has the responsibility to monitor the job leaders (the job manager which is leader + * for a given job) for all registered jobs. Upon gaining leadership for a job and detection by the + * job leader service, the service tries to establish a connection to the job leader. After + * successfully establishing a connection, the job leader listener is notified about the new job + * leader and its connection. In case that a job leader loses leadership, the job leader listener + * is notified as well. + */ +public class JobLeaderService { + + private static final Logger LOG = LoggerFactory.getLogger(JobLeaderService.class); + + /** Process id of the owning process */ + private final ResourceID ownerProcessId; + + /** The leader retrieval service and listener for each registered job */ + private final Map> jobLeaderServices; + + /** Internal state of the service */ + private volatile JobLeaderService.State state; + + /** Address of the owner of this service. This address is used for the job manager connection */ + private String ownerAddress; + + /** Rpc service to use for establishing connections */ + private RpcService rpcService; + + /** High availability services to create the leader retrieval services from */ + private HighAvailabilityServices highAvailabilityServices; + + /** Job leader listener listening for job leader changes */ + private JobLeaderListener jobLeaderListener; + + public JobLeaderService(ResourceID ownerProcessId) { + this.ownerProcessId = Preconditions.checkNotNull(ownerProcessId); + + jobLeaderServices = new HashMap<>(4); + + state = JobLeaderService.State.CREATED; + + ownerAddress = null; + rpcService = null; + highAvailabilityServices = null; + jobLeaderListener = null; + } + + // ------------------------------------------------------------------------------- + // Methods + // ------------------------------------------------------------------------------- + + /** + * Start the job leader service with the given services. + * + * @param initialOwnerAddress to be used for establishing connections (source address) + * @param initialRpcService to be used to create rpc connections + * @param initialHighAvailabilityServices to create leader retrieval services for the different jobs + * @param initialJobLeaderListener listening for job leader changes + */ + public void start( + final String initialOwnerAddress, + final RpcService initialRpcService, + final HighAvailabilityServices initialHighAvailabilityServices, + final JobLeaderListener initialJobLeaderListener) { + + if (JobLeaderService.State.CREATED != state) { + throw new IllegalStateException("The service has already been started."); + } else { + LOG.info("Start job leader service."); + + this.ownerAddress = Preconditions.checkNotNull(initialOwnerAddress); + this.rpcService = Preconditions.checkNotNull(initialRpcService); + this.highAvailabilityServices = Preconditions.checkNotNull(initialHighAvailabilityServices); + this.jobLeaderListener = Preconditions.checkNotNull(initialJobLeaderListener); + state = JobLeaderService.State.STARTED; + } + } + + /** + * Stop the job leader services. This implies stopping all leader retrieval services for the + * different jobs and their leader retrieval listeners. + * + * @throws Exception if an error occurs while stopping the service + */ + public void stop() throws Exception { + LOG.info("Stop job leader service."); + + if (JobLeaderService.State.STARTED == state) { + + for (Tuple2 leaderRetrievalServiceEntry: jobLeaderServices.values()) { + LeaderRetrievalService leaderRetrievalService = leaderRetrievalServiceEntry.f0; + JobLeaderService.JobManagerLeaderListener jobManagerLeaderListener = leaderRetrievalServiceEntry.f1; + + jobManagerLeaderListener.stop(); + leaderRetrievalService.stop(); + } + + jobLeaderServices.clear(); + } + + state = JobLeaderService.State.STOPPED; + } + + /** + * Check whether the service monitors the given job. + * + * @param jobId identifying the job + * @return True if the given job is monitored; otherwise false + */ + public boolean containsJob(JobID jobId) { + Preconditions.checkState(JobLeaderService.State.STARTED == state, "The service is currently not running."); + + return jobLeaderServices.containsKey(jobId); + } + + /** + * Remove the given job from being monitored by the job leader service. + * + * @param jobId identifying the job to remove from monitoring + * @throws Exception if an error occurred while stopping the leader retrieval service and listener + */ + public void removeJob(JobID jobId) throws Exception { + Preconditions.checkState(JobLeaderService.State.STARTED == state, "The service is currently not running."); + + Tuple2 entry = jobLeaderServices.remove(jobId); + + if (entry != null) { + LOG.info("Remove job {} from job leader monitoring.", jobId); + + LeaderRetrievalService leaderRetrievalService = entry.f0; + JobLeaderService.JobManagerLeaderListener jobManagerLeaderListener = entry.f1; + + leaderRetrievalService.stop(); + jobManagerLeaderListener.stop(); + } + } + + /** + * Add the given job to be monitored. This means that the service tries to detect leaders for + * this job and then tries to establish a connection to it. + * + * @param jobId identifying the job to monitor + * @param defaultTargetAddress of the job leader + * @throws Exception if an error occurs while starting the leader retrieval service + */ + public void addJob(final JobID jobId, final String defaultTargetAddress) throws Exception { + Preconditions.checkState(JobLeaderService.State.STARTED == state, "The service is currently not running."); + + LOG.info("Add job {} for job leader monitoring.", jobId); + + final LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever( + jobId, + defaultTargetAddress); + + JobLeaderService.JobManagerLeaderListener jobManagerLeaderListener = new JobManagerLeaderListener(jobId); + + leaderRetrievalService.start(jobManagerLeaderListener); + + jobLeaderServices.put(jobId, Tuple2.of(leaderRetrievalService, jobManagerLeaderListener)); + } + + /** + * Leader listener which tries to establish a connection to a newly detected job leader. + */ + private final class JobManagerLeaderListener implements LeaderRetrievalListener { + + /** Job id identifying the job to look for a leader */ + private final JobID jobId; + + /** Rpc connection to the job leader */ + private RegisteredRpcConnection rpcConnection; + + /** State of the listener */ + private volatile boolean stopped; + + /** Leader id of the current job leader */ + private volatile UUID currentLeaderId; + + private JobManagerLeaderListener(JobID jobId) { + this.jobId = Preconditions.checkNotNull(jobId); + + stopped = false; + rpcConnection = null; + currentLeaderId = null; + } + + public void stop() { + stopped = true; + + if (rpcConnection != null) { + rpcConnection.close(); + } + } + + @Override + public void notifyLeaderAddress(final String leaderAddress, final UUID leaderId) { + if (stopped) { + LOG.debug("{}'s leader retrieval listener reported a new leader for job {}. " + + "However, the service is no longer running.", JobLeaderService.class.getSimpleName(), jobId); + } else { + LOG.debug("New leader information for job {}. Address: {}, leader id: {}.", + jobId, leaderAddress, leaderId); + + if (leaderAddress == null || leaderAddress.isEmpty()) { + // the leader lost leadership but there is no other leader yet. + if (rpcConnection != null) { + rpcConnection.close(); + } + + jobLeaderListener.jobManagerLostLeadership(jobId, currentLeaderId); + + currentLeaderId = leaderId; + } else { + currentLeaderId = leaderId; + + if (rpcConnection != null) { + // check if we are already trying to connect to this leader + if (!leaderId.equals(rpcConnection.getTargetLeaderId())) { + rpcConnection.close(); + + rpcConnection = new JobManagerRegisteredRpcConnection( + LOG, + leaderAddress, + leaderId, + rpcService.getExecutor()); + } + } else { + rpcConnection = new JobManagerRegisteredRpcConnection( + LOG, + leaderAddress, + leaderId, + rpcService.getExecutor()); + } + + // double check for a concurrent stop operation + if (stopped) { + rpcConnection.close(); + } else { + LOG.info("Try to register at job manager {} with leader id {}.", leaderAddress, leaderId); + rpcConnection.start(); + } + } + } + } + + @Override + public void handleError(Exception exception) { + if (stopped) { + LOG.debug("{}'s leader retrieval listener reported an exception for job {}. " + + "However, the service is no longer running.", JobLeaderService.class.getSimpleName(), + jobId, exception); + } else { + jobLeaderListener.handleError(exception); + } + } + + /** + * Rpc connection for the job manager <--> task manager connection. + */ + private final class JobManagerRegisteredRpcConnection extends RegisteredRpcConnection { + + JobManagerRegisteredRpcConnection( + Logger log, + String targetAddress, + UUID targetLeaderId, + Executor executor) { + super(log, targetAddress, targetLeaderId, executor); + } + + @Override + protected RetryingRegistration generateRegistration() { + return new JobLeaderService.JobManagerRetryingRegistration( + LOG, + rpcService, + "JobManager", + JobMasterGateway.class, + getTargetAddress(), + getTargetLeaderId(), + ownerAddress, + ownerProcessId); + } + + @Override + protected void onRegistrationSuccess(JMTMRegistrationSuccess success) { + // filter out old registration attempts + if (getTargetLeaderId().equals(currentLeaderId)) { + log.info("Successful registration at job manager {} for job {}.", getTargetAddress(), jobId); + + jobLeaderListener.jobManagerGainedLeadership(jobId, getTargetGateway(), getTargetLeaderId(), success); + } else { + log.debug("Encountered obsolete JobManager registration success from {} with leader session ID {}.", getTargetAddress(), getTargetLeaderId()); + } + } + + @Override + protected void onRegistrationFailure(Throwable failure) { + // filter out old registration attempts + if (getTargetLeaderId().equals(currentLeaderId)) { + log.info("Failed to register at job manager {} for job {}.", getTargetAddress(), jobId); + jobLeaderListener.handleError(failure); + } else { + log.debug("Obsolete JobManager registration failure from {} with leader session ID {}.", getTargetAddress(), getTargetLeaderId(), failure); + } + } + } + } + + /** + * Retrying registration for the job manager <--> task manager connection. + */ + private static final class JobManagerRetryingRegistration extends RetryingRegistration { + + private final String taskManagerAddress; + private final ResourceID taskManagerProcessId; + + JobManagerRetryingRegistration( + Logger log, + RpcService rpcService, + String targetName, + Class targetType, + String targetAddress, + UUID leaderId, + String taskManagerAddress, + ResourceID taskManagerProcessId) { + + super(log, rpcService, targetName, targetType, targetAddress, leaderId); + + this.taskManagerAddress = Preconditions.checkNotNull(taskManagerAddress); + this.taskManagerProcessId = Preconditions.checkNotNull(taskManagerProcessId); + } + + @Override + protected Future invokeRegistration(JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception { + return gateway.registerTaskManager( + taskManagerAddress, + taskManagerProcessId, + leaderId, + Time.milliseconds(timeoutMillis)); + } + } + + /** + * Internal state of the service + */ + private enum State { + CREATED, STARTED, STOPPED + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2b234521/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java index 6fcd082..8d2057a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java @@ -34,7 +34,7 @@ import java.util.UUID; public class JobManagerConnection { // Job master leader session id - private final UUID jobMasterLeaderId; + private final UUID leaderId; // Gateway to the job master private final JobMasterGateway jobMasterGateway; @@ -55,15 +55,14 @@ public class JobManagerConnection { private final PartitionStateChecker partitionStateChecker; public JobManagerConnection( - UUID jobMasterLeaderId, - JobMasterGateway jobMasterGateway, - TaskManagerActions taskManagerActions, - CheckpointResponder checkpointResponder, - LibraryCacheManager libraryCacheManager, - ResultPartitionConsumableNotifier resultPartitionConsumableNotifier, - PartitionStateChecker partitionStateChecker) - { - this.jobMasterLeaderId = Preconditions.checkNotNull(jobMasterLeaderId); + JobMasterGateway jobMasterGateway, + UUID leaderId, + TaskManagerActions taskManagerActions, + CheckpointResponder checkpointResponder, + LibraryCacheManager libraryCacheManager, + ResultPartitionConsumableNotifier resultPartitionConsumableNotifier, + PartitionStateChecker partitionStateChecker) { + this.leaderId = Preconditions.checkNotNull(leaderId); this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway); this.taskManagerActions = Preconditions.checkNotNull(taskManagerActions); this.checkpointResponder = Preconditions.checkNotNull(checkpointResponder); @@ -72,8 +71,8 @@ public class JobManagerConnection { this.partitionStateChecker = Preconditions.checkNotNull(partitionStateChecker); } - public UUID getJobMasterLeaderId() { - return jobMasterLeaderId; + public UUID getLeaderId() { + return leaderId; } public JobMasterGateway getJobManagerGateway() { http://git-wip-us.apache.org/repos/asf/flink/blob/2b234521/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerTable.java new file mode 100644 index 0000000..00c467e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerTable.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.JobID; + +import java.util.HashMap; +import java.util.Map; + +/** + * Container for multiple {@link JobManagerConnection} registered under their respective job id. + */ +public class JobManagerTable { + private final Map jobManagerConnections; + + public JobManagerTable() { + jobManagerConnections = new HashMap<>(4); + } + + public boolean contains(JobID jobId) { + return jobManagerConnections.containsKey(jobId); + } + + public boolean put(JobID jobId, JobManagerConnection jobManagerConnection) { + JobManagerConnection previousJMC = jobManagerConnections.put(jobId, jobManagerConnection); + + if (previousJMC != null) { + jobManagerConnections.put(jobId, previousJMC); + + return false; + } else { + return true; + } + } + + public JobManagerConnection remove(JobID jobId) { + return jobManagerConnections.remove(jobId); + } + + public JobManagerConnection get(JobID jobId) { + return jobManagerConnections.get(jobId); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2b234521/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index e642315..3e3a544 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -19,12 +19,14 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.concurrent.AcceptFunction; import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; @@ -34,7 +36,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.PartitionInfo; import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; @@ -42,6 +43,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNo import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; +import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.memory.MemoryManager; @@ -51,7 +53,6 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected; import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcEndpoint; @@ -59,6 +60,7 @@ import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException; import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException; +import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; import org.apache.flink.runtime.taskexecutor.exceptions.TaskException; import org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException; import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder; @@ -81,9 +83,10 @@ import java.net.InetSocketAddress; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.Map; -import java.util.Set; import java.util.UUID; +import java.util.concurrent.TimeoutException; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -97,6 +100,9 @@ public class TaskExecutor extends RpcEndpoint { /** The connection information of this task manager */ private final TaskManagerLocation taskManagerLocation; + /** Max blob port which is accepted */ + public static final int MAX_BLOB_PORT = 65536; + /** The access to the leader election and retrieval services */ private final HighAvailabilityServices haServices; @@ -121,10 +127,6 @@ public class TaskExecutor extends RpcEndpoint { private final TaskManagerMetricGroup taskManagerMetricGroup; private final BroadcastVariableManager broadcastVariableManager; - - /** Slots which have become available but haven't been confirmed by the RM */ - private final Set unconfirmedFreeSlots; - private final FileCache fileCache; @@ -140,6 +142,10 @@ public class TaskExecutor extends RpcEndpoint { private final TaskSlotTable taskSlotTable; + private final JobManagerTable jobManagerTable; + + private final JobLeaderService jobLeaderService; + // ------------------------------------------------------------------------ public TaskExecutor( @@ -155,6 +161,8 @@ public class TaskExecutor extends RpcEndpoint { BroadcastVariableManager broadcastVariableManager, FileCache fileCache, TaskSlotTable taskSlotTable, + JobManagerTable jobManagerTable, + JobLeaderService jobLeaderService, FatalErrorHandler fatalErrorHandler) { super(rpcService); @@ -173,10 +181,10 @@ public class TaskExecutor extends RpcEndpoint { this.taskManagerMetricGroup = checkNotNull(taskManagerMetricGroup); this.broadcastVariableManager = checkNotNull(broadcastVariableManager); this.fileCache = checkNotNull(fileCache); + this.jobManagerTable = checkNotNull(jobManagerTable); + this.jobLeaderService = checkNotNull(jobLeaderService); this.jobManagerConnections = new HashMap<>(4); - - this.unconfirmedFreeSlots = new HashSet<>(); } // ------------------------------------------------------------------------ @@ -195,7 +203,10 @@ public class TaskExecutor extends RpcEndpoint { } // tell the task slot table who's responsible for the task slot actions - taskSlotTable.start(new SlotActionsImpl(), taskManagerConfiguration.getTimeout()); + taskSlotTable.start(new SlotActionsImpl()); + + // start the job leader service + jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl()); } /** @@ -207,7 +218,7 @@ public class TaskExecutor extends RpcEndpoint { taskSlotTable.stop(); - if (resourceManagerConnection.isConnected()) { + if (isConnectedToResourceManager()) { try { resourceManagerConnection.close(); } catch (Exception e) { @@ -248,30 +259,39 @@ public class TaskExecutor extends RpcEndpoint { log.info("Stopped TaskManager {}.", getAddress()); } - // ======================================================================== + // ====================================================================== // RPC methods - // ======================================================================== + // ====================================================================== // ---------------------------------------------------------------------- // Task lifecycle RPCs // ---------------------------------------------------------------------- @RpcMethod - public Acknowledge submitTask(TaskDeploymentDescriptor tdd, ResourceID jobManagerID) throws TaskSubmissionException { + public Acknowledge submitTask(TaskDeploymentDescriptor tdd, UUID jobManagerLeaderId) throws TaskSubmissionException { - JobManagerConnection jobManagerConnection = getJobManagerConnection(jobManagerID); + JobManagerConnection jobManagerConnection = jobManagerTable.get(tdd.getJobID()); if (jobManagerConnection == null) { - final String message = "Could not submit task because JobManager " + jobManagerID + - " was not associated."; + final String message = "Could not submit task because there is no JobManager " + + "associated for the job " + tdd.getJobID() + '.'; + + log.debug(message); + throw new TaskSubmissionException(message); + } + + if (!jobManagerConnection.getLeaderId().equals(jobManagerLeaderId)) { + final String message = "Rejecting the task submission because the job manager leader id " + + jobManagerLeaderId + " does not match the expected job manager leader id " + + jobManagerConnection.getLeaderId() + '.'; log.debug(message); throw new TaskSubmissionException(message); } - if (!taskSlotTable.existActiveSlot(tdd.getJobID(), tdd.getAllocationID())) { + if (!taskSlotTable.existsActiveSlot(tdd.getJobID(), tdd.getAllocationId())) { final String message = "No task slot allocated for job ID " + tdd.getJobID() + - " and allocation ID " + tdd.getAllocationID() + '.'; + " and allocation ID " + tdd.getAllocationId() + '.'; log.debug(message); throw new TaskSubmissionException(message); } @@ -279,7 +299,7 @@ public class TaskExecutor extends RpcEndpoint { TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(tdd); InputSplitProvider inputSplitProvider = new RpcInputSplitProvider( - jobManagerConnection.getJobMasterLeaderId(), + jobManagerConnection.getLeaderId(), jobManagerConnection.getJobManagerGateway(), tdd.getJobID(), tdd.getVertexID(), @@ -375,7 +395,7 @@ public class TaskExecutor extends RpcEndpoint { // ---------------------------------------------------------------------- @RpcMethod - public Acknowledge updatePartitions(final ExecutionAttemptID executionAttemptID, Collection partitionInfos) throws PartitionException { + public Acknowledge updatePartitions(final ExecutionAttemptID executionAttemptID, Iterable partitionInfos) throws PartitionException { final Task task = taskSlotTable.getTask(executionAttemptID); if (task != null) { @@ -471,38 +491,319 @@ public class TaskExecutor extends RpcEndpoint { } } + // ---------------------------------------------------------------------- + // Slot allocation RPCs + // ---------------------------------------------------------------------- + /** + * /** * Requests a slot from the TaskManager * - * @param slotID Slot id for the request - * @param allocationID id for the request - * @param resourceManagerLeaderID current leader id of the ResourceManager + * @param slotId identifying the requested slot + * @param jobId identifying the job for which the request is issued + * @param allocationId id for the request + * @param targetAddress of the job manager requesting the slot + * @param rmLeaderId current leader id of the ResourceManager + * @throws SlotAllocationException if the slot allocation fails * @return answer to the slot request */ @RpcMethod - public TMSlotRequestReply requestSlot(SlotID slotID, AllocationID allocationID, UUID resourceManagerLeaderID) { - if (!resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderID)) { - return new TMSlotRequestRejected( - resourceManagerConnection.getRegistrationId(), getResourceID(), allocationID); + public TMSlotRequestReply requestSlot( + final SlotID slotId, + final JobID jobId, + final AllocationID allocationId, + final String targetAddress, + final UUID rmLeaderId) throws SlotAllocationException { + log.info("Receive slot request {} for job {} from resource manager with leader id {}.", + allocationId, jobId, rmLeaderId); + + if (resourceManagerConnection == null) { + final String message = "TaskManager is not connected to a resource manager."; + log.debug(message); + throw new SlotAllocationException(message); } - if (unconfirmedFreeSlots.contains(slotID)) { - // check if request has not been blacklisted because the notification of a free slot - // has not been confirmed by the ResourceManager - return new TMSlotRequestRejected( - resourceManagerConnection.getRegistrationId(), getResourceID(), allocationID); + + if (!resourceManagerConnection.getTargetLeaderId().equals(rmLeaderId)) { + final String message = "The leader id " + rmLeaderId + + " does not match with the leader id of the connected resource manager " + + resourceManagerConnection.getTargetLeaderId() + '.'; + + log.debug(message); + throw new SlotAllocationException(message); + } + + if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) { + if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, taskManagerConfiguration.getTimeout())) { + log.info("Allocated slot for {}.", allocationId); + } else { + log.info("Could not allocate slot for {}.", allocationId); + throw new SlotAllocationException("Could not allocate slot."); + } + } else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) { + final String message = "The slot " + slotId + " has already been allocated for a different job."; + + log.info(message); + throw new SlotAllocationException(message); + } + + if (jobManagerTable.contains(jobId)) { + offerSlotsToJobManager(jobId); + } else { + try { + jobLeaderService.addJob(jobId, targetAddress); + } catch (Exception e) { + // free the allocated slot + try { + taskSlotTable.freeSlot(allocationId); + } catch (SlotNotFoundException slotNotFoundException) { + // slot no longer existent, this should actually never happen, because we've + // just allocated the slot. So let's fail hard in this case! + onFatalError(slotNotFoundException); + } + + // sanity check + if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) { + onFatalError(new Exception("Could not free slot " + slotId)); + } + + throw new SlotAllocationException("Could not add job to job leader service.", e); + } } - return new TMSlotRequestRegistered(new InstanceID(), ResourceID.generate(), allocationID); + return new TMSlotRequestRegistered(resourceManagerConnection.getRegistrationId(), getResourceID(), allocationId); } - // ------------------------------------------------------------------------ + // ====================================================================== // Internal methods + // ====================================================================== + + // ------------------------------------------------------------------------ + // Internal resource manager connection methods + // ------------------------------------------------------------------------ + + private void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLeaderId) { + if (resourceManagerConnection != null) { + if (newLeaderAddress != null) { + // the resource manager switched to a new leader + log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", + resourceManagerConnection.getTargetAddress(), newLeaderAddress); + } + else { + // address null means that the current leader is lost without a new leader being there, yet + log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", + resourceManagerConnection.getTargetAddress()); + } + + // drop the current connection or connection attempt + if (resourceManagerConnection != null) { + resourceManagerConnection.close(); + resourceManagerConnection = null; + } + } + + // establish a connection to the new leader + if (newLeaderAddress != null) { + log.info("Attempting to register at ResourceManager {}", newLeaderAddress); + resourceManagerConnection = + new TaskExecutorToResourceManagerConnection( + log, + this, + newLeaderAddress, + newLeaderId, + getMainThreadExecutor()); + resourceManagerConnection.start(); + } + } + // ------------------------------------------------------------------------ + // Internal job manager connection methods + // ------------------------------------------------------------------------ + + private void offerSlotsToJobManager(final JobID jobId) { + final JobManagerConnection jobManagerConnection = jobManagerTable.get(jobId); + + if (jobManagerConnection == null) { + log.debug("There is no job manager connection to the leader of job {}.", jobId); + } else { + if (taskSlotTable.hasAllocatedSlots(jobId)) { + log.info("Offer reserved slots to the leader of job {}.", jobId); + + final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway(); + + final Iterator reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId); + final UUID leaderId = jobManagerConnection.getLeaderId(); + + final Collection reservedSlots = new HashSet<>(2); + + while (reservedSlotsIterator.hasNext()) { + reservedSlots.add(reservedSlotsIterator.next()); + } + + Future> acceptedSlotsFuture = jobMasterGateway.offerSlots( + reservedSlots, + leaderId, + taskManagerConfiguration.getTimeout()); + + acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction>() { + @Override + public void accept(Iterable acceptedSlots) { + // check if the response is still valid + if (isJobManagerConnectionValid(jobId, leaderId)) { + // mark accepted slots active + for (AllocationID acceptedSlot: acceptedSlots) { + try { + if (!taskSlotTable.markSlotActive(acceptedSlot)) { + // the slot is either free or releasing at the moment + final String message = "Could not mark slot " + jobId + " active."; + log.debug(message); + jobMasterGateway.failSlot(acceptedSlot, leaderId, new Exception(message)); + } + + // remove the assigned slots so that we can free the left overs + reservedSlots.remove(acceptedSlot); + } catch (SlotNotFoundException e) { + log.debug("Could not mark slot {} active.", acceptedSlot, e); + jobMasterGateway.failSlot(acceptedSlot, leaderId, e); + } + } + + final Exception e = new Exception("The slot was rejected by the JobManager."); + + for (AllocationID rejectedSlot: reservedSlots) { + freeSlot(rejectedSlot, e); + } + } else { + // discard the response since there is a new leader for the job + log.debug("Discard offer slot response since there is a new leader " + + "for the job {}.", jobId); + } + } + }, getMainThreadExecutor()); + + acceptedSlotsFuture.exceptionallyAsync(new ApplyFunction() { + @Override + public Void apply(Throwable throwable) { + if (throwable instanceof TimeoutException) { + // We ran into a timeout. Try again. + offerSlotsToJobManager(jobId); + } else { + // We encountered an exception. Free the slots and return them to the RM. + for (AllocationID reservedSlot: reservedSlots) { + freeSlot(reservedSlot, throwable); + } + } + + return null; + } + }, getMainThreadExecutor()); + } else { + log.debug("There are no unassigned slots for the job {}.", jobId); + } + } + } + + private void establishJobManagerConnection(JobID jobId, JobMasterGateway jobMasterGateway, UUID jobManagerLeaderId, JMTMRegistrationSuccess registrationSuccess) { + log.info("Establish JobManager connection for job {}.", jobId); + + if (jobManagerTable.contains(jobId)) { + JobManagerConnection oldJobManagerConnection = jobManagerTable.get(jobId); + + if (!oldJobManagerConnection.getLeaderId().equals(jobManagerLeaderId)) { + closeJobManagerConnection(jobId); + jobManagerTable.put(jobId, associateWithJobManager(jobMasterGateway, jobManagerLeaderId, registrationSuccess.getBlobPort())); + } + } else { + jobManagerTable.put(jobId, associateWithJobManager(jobMasterGateway, jobManagerLeaderId, registrationSuccess.getBlobPort())); + } - private JobManagerConnection getJobManagerConnection(ResourceID jobManagerID) { - return jobManagerConnections.get(jobManagerID); + offerSlotsToJobManager(jobId); } + private void closeJobManagerConnection(JobID jobId) { + log.info("Close JobManager connection for job {}.", jobId); + + // 1. fail tasks running under this JobID + Iterator tasks = taskSlotTable.getTasks(jobId); + + while (tasks.hasNext()) { + tasks.next().failExternally(new Exception("JobManager responsible for " + jobId + + " lost the leadership.")); + } + + // 2. Move the active slots to state allocated (possible to time out again) + Iterator activeSlots = taskSlotTable.getActiveSlots(jobId); + + while (activeSlots.hasNext()) { + AllocationID activeSlot = activeSlots.next(); + + try { + if (!taskSlotTable.markSlotInactive(activeSlot, taskManagerConfiguration.getTimeout())) { + freeSlot(activeSlot, new Exception("Slot could not be marked inactive.")); + } + } catch (SlotNotFoundException e) { + log.debug("Could not mark the slot {} inactive.", jobId, e); + } + } + + // 3. Disassociate from the JobManager + JobManagerConnection jobManagerConnection = jobManagerTable.remove(jobId); + + if (jobManagerConnection != null) { + try { + disassociateFromJobManager(jobManagerConnection); + } catch (IOException e) { + log.warn("Could not properly disassociate from JobManager {}.", + jobManagerConnection.getJobManagerGateway().getAddress(), e); + } + } + } + + private JobManagerConnection associateWithJobManager(JobMasterGateway jobMasterGateway, UUID jobManagerLeaderId, int blobPort) { + Preconditions.checkNotNull(jobManagerLeaderId); + Preconditions.checkNotNull(jobMasterGateway); + Preconditions.checkArgument(blobPort > 0 || blobPort < MAX_BLOB_PORT, "Blob port is out of range."); + + TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobManagerLeaderId, jobMasterGateway); + + CheckpointResponder checkpointResponder = new RpcCheckpointResponder(jobMasterGateway); + + InetSocketAddress address = new InetSocketAddress(jobMasterGateway.getAddress(), blobPort); + + BlobCache blobCache = new BlobCache(address, taskManagerConfiguration.getConfiguration()); + + LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager( + blobCache, + taskManagerConfiguration.getCleanupInterval()); + + ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier( + jobManagerLeaderId, + jobMasterGateway, + getRpcService().getExecutor(), + taskManagerConfiguration.getTimeout()); + + PartitionStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobManagerLeaderId, jobMasterGateway); + + return new JobManagerConnection( + jobMasterGateway, + jobManagerLeaderId, + taskManagerActions, + checkpointResponder, + libraryCacheManager, + resultPartitionConsumableNotifier, + partitionStateChecker); + } + + private void disassociateFromJobManager(JobManagerConnection jobManagerConnection) throws IOException { + Preconditions.checkNotNull(jobManagerConnection); + JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway(); + jobManagerGateway.disconnectTaskManager(getResourceID()); + jobManagerConnection.getLibraryCacheManager().shutdown(); + } + + // ------------------------------------------------------------------------ + // Internal task methods + // ------------------------------------------------------------------------ + private void failTask(final ExecutionAttemptID executionAttemptID, final Throwable cause) { final Task task = taskSlotTable.getTask(executionAttemptID); @@ -571,94 +872,11 @@ public class TaskExecutor extends RpcEndpoint { } } - private void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLeaderId) { - if (resourceManagerConnection != null) { - if (newLeaderAddress != null) { - // the resource manager switched to a new leader - log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", - resourceManagerConnection.getTargetAddress(), newLeaderAddress); - } - else { - // address null means that the current leader is lost without a new leader being there, yet - log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", - resourceManagerConnection.getTargetAddress()); - } - - // drop the current connection or connection attempt - if (resourceManagerConnection != null) { - resourceManagerConnection.close(); - resourceManagerConnection = null; - } - } - - unconfirmedFreeSlots.clear(); - - // establish a connection to the new leader - if (newLeaderAddress != null) { - log.info("Attempting to register at ResourceManager {}", newLeaderAddress); - resourceManagerConnection = - new TaskExecutorToResourceManagerConnection( - log, - this, - newLeaderAddress, - newLeaderId, - getMainThreadExecutor()); - resourceManagerConnection.start(); - } - } - - private JobManagerConnection associateWithJobManager(UUID jobMasterLeaderId, - JobMasterGateway jobMasterGateway, int blobPort) - { - Preconditions.checkNotNull(jobMasterLeaderId); - Preconditions.checkNotNull(jobMasterGateway); - Preconditions.checkArgument(blobPort > 0 || blobPort <= 65535, "Blob port is out of range."); - - TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobMasterLeaderId, jobMasterGateway); - - CheckpointResponder checkpointResponder = new RpcCheckpointResponder(jobMasterGateway); - - InetSocketAddress address = new InetSocketAddress(jobMasterGateway.getAddress(), blobPort); - - BlobCache blobCache = new BlobCache(address, taskManagerConfiguration.getConfiguration()); - - LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager( - blobCache, - taskManagerConfiguration.getCleanupInterval()); - - ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier( - jobMasterLeaderId, - jobMasterGateway, - getRpcService().getExecutor(), - taskManagerConfiguration.getTimeout()); - - PartitionStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobMasterLeaderId, jobMasterGateway); - - return new JobManagerConnection( - jobMasterLeaderId, - jobMasterGateway, - taskManagerActions, - checkpointResponder, - libraryCacheManager, - resultPartitionConsumableNotifier, - partitionStateChecker); - } - - private void disassociateFromJobManager(JobManagerConnection jobManagerConnection) throws IOException { - if (jobManagerConnection != null) { - JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway(); - - jobManagerGateway.disconnectTaskManager(getResourceID()); - - jobManagerConnection.getLibraryCacheManager().shutdown(); - } - } - - private void freeSlot(AllocationID allocationId) { + private void freeSlot(AllocationID allocationId, Throwable cause) { Preconditions.checkNotNull(allocationId); try { - int freedSlotIndex = taskSlotTable.freeSlot(allocationId); + int freedSlotIndex = taskSlotTable.freeSlot(allocationId, cause); if (freedSlotIndex != -1 && isConnectedToResourceManager()) { // the slot was freed. Tell the RM about it @@ -674,21 +892,35 @@ public class TaskExecutor extends RpcEndpoint { } } + private void freeSlot(AllocationID allocationId) { + freeSlot(allocationId, new Exception("The slot " + allocationId + " is beeing freed.")); + } + private void timeoutSlot(AllocationID allocationId, UUID ticket) { Preconditions.checkNotNull(allocationId); Preconditions.checkNotNull(ticket); if (taskSlotTable.isValidTimeout(allocationId, ticket)) { - freeSlot(allocationId); + freeSlot(allocationId, new Exception("The slot " + allocationId + " has timed out.")); } else { log.debug("Received an invalid timeout for allocation id {} with ticket {}.", allocationId, ticket); } } + // ------------------------------------------------------------------------ + // Internal utility methods + // ------------------------------------------------------------------------ + private boolean isConnectedToResourceManager() { return (resourceManagerConnection != null && resourceManagerConnection.isConnected()); } + private boolean isJobManagerConnectionValid(JobID jobId, UUID leaderId) { + JobManagerConnection jmConnection = jobManagerTable.get(jobId); + + return jmConnection != null && jmConnection.getLeaderId().equals(leaderId); + } + // ------------------------------------------------------------------------ // Properties // ------------------------------------------------------------------------ @@ -737,11 +969,6 @@ public class TaskExecutor extends RpcEndpoint { return resourceManagerConnection; } - @VisibleForTesting - public void addUnconfirmedFreeSlotNotification(SlotID slotID) { - unconfirmedFreeSlots.add(slotID); - } - // ------------------------------------------------------------------------ // Utility classes // ------------------------------------------------------------------------ @@ -767,6 +994,44 @@ public class TaskExecutor extends RpcEndpoint { } } + private final class JobLeaderListenerImpl implements JobLeaderListener { + + @Override + public void jobManagerGainedLeadership( + final JobID jobId, + final JobMasterGateway jobManagerGateway, + final UUID jobLeaderId, + final JMTMRegistrationSuccess registrationMessage) { + runAsync(new Runnable() { + @Override + public void run() { + establishJobManagerConnection( + jobId, + jobManagerGateway, + jobLeaderId, + registrationMessage); + } + }); + } + + @Override + public void jobManagerLostLeadership(final JobID jobId, final UUID jobLeaderId) { + log.info("JobManager for job {} with leader id {} lost leadership.", jobId, jobLeaderId); + + runAsync(new Runnable() { + @Override + public void run() { + closeJobManagerConnection(jobId); + } + }); + } + + @Override + public void handleError(Throwable throwable) { + onFatalErrorAsync(throwable); + } + } + private final class TaskManagerActionsImpl implements TaskManagerActions { private final UUID jobMasterLeaderId; private final JobMasterGateway jobMasterGateway; @@ -830,5 +1095,4 @@ public class TaskExecutor extends RpcEndpoint { }); } } - } http://git-wip-us.apache.org/repos/asf/flink/blob/2b234521/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index f062b96..1ffc407 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -18,10 +18,10 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.SlotID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; @@ -30,9 +30,9 @@ import org.apache.flink.runtime.executiongraph.PartitionInfo; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; +import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; import org.apache.flink.runtime.taskmanager.Task; -import java.util.Collection; import java.util.UUID; /** @@ -43,28 +43,31 @@ public interface TaskExecutorGateway extends RpcGateway { /** * Requests a slot from the TaskManager * - * @param slotID slot id for the request - * @param allocationID id for the request - * @param resourceManagerLeaderID current leader id of the ResourceManager + * @param slotId slot id for the request + * @param allocationId id for the request + * @param resourceManagerLeaderId current leader id of the ResourceManager + * @throws SlotAllocationException if the slot allocation fails * @return answer to the slot request */ Future requestSlot( - SlotID slotID, - AllocationID allocationID, - UUID resourceManagerLeaderID, + SlotID slotId, + JobID jobId, + AllocationID allocationId, + String targetAddress, + UUID resourceManagerLeaderId, @RpcTimeout Time timeout); /** * Submit a {@link Task} to the {@link TaskExecutor}. * * @param tdd describing the task to submit - * @param jobManagerID identifying the submitting JobManager + * @param leaderId of the job leader * @param timeout of the submit operation * @return Future acknowledge of the successful operation */ Future submitTask( TaskDeploymentDescriptor tdd, - ResourceID jobManagerID, + UUID leaderId, @RpcTimeout Time timeout); /** @@ -74,7 +77,7 @@ public interface TaskExecutorGateway extends RpcGateway { * @param partitionInfos telling where the partition can be retrieved from * @return Future acknowledge if the partitions have been successfully updated */ - Future updatePartitions(ExecutionAttemptID executionAttemptID, Collection partitionInfos); + Future updatePartitions(ExecutionAttemptID executionAttemptID, Iterable partitionInfos); /** * Fail all intermediate result partitions of the given task. http://git-wip-us.apache.org/repos/asf/flink/blob/2b234521/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java index 2dbd9eb..53f030e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java @@ -68,11 +68,16 @@ public class TaskExecutorToResourceManagerConnection @Override protected void onRegistrationSuccess(TaskExecutorRegistrationSuccess success) { + log.info("Successful registration at resource manager {} under registration id {}.", + getTargetAddress(), success.getRegistrationId()); + registrationId = success.getRegistrationId(); } @Override protected void onRegistrationFailure(Throwable failure) { + log.info("Failed to register at resource manager {}.", getTargetAddress(), failure); + taskExecutor.onFatalErrorAsync(failure); } http://git-wip-us.apache.org/repos/asf/flink/blob/2b234521/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index ca1d2ce..9f78682 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -99,6 +99,8 @@ public class TaskManagerRunner implements FatalErrorHandler { taskManagerServices.getBroadcastVariableManager(), taskManagerServices.getFileCache(), taskManagerServices.getTaskSlotTable(), + taskManagerServices.getJobManagerTable(), + taskManagerServices.getJobLeaderService(), this); } http://git-wip-us.apache.org/repos/asf/flink/blob/2b234521/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index c1728b4..7575ba3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -73,6 +73,8 @@ public class TaskManagerServices { private final BroadcastVariableManager broadcastVariableManager; private final FileCache fileCache; private final TaskSlotTable taskSlotTable; + private final JobManagerTable jobManagerTable; + private final JobLeaderService jobLeaderService; private TaskManagerServices( TaskManagerLocation taskManagerLocation, @@ -83,7 +85,9 @@ public class TaskManagerServices { TaskManagerMetricGroup taskManagerMetricGroup, BroadcastVariableManager broadcastVariableManager, FileCache fileCache, - TaskSlotTable taskSlotTable) { + TaskSlotTable taskSlotTable, + JobManagerTable jobManagerTable, + JobLeaderService jobLeaderService) { this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation); this.memoryManager = Preconditions.checkNotNull(memoryManager); @@ -94,6 +98,8 @@ public class TaskManagerServices { this.broadcastVariableManager = Preconditions.checkNotNull(broadcastVariableManager); this.fileCache = Preconditions.checkNotNull(fileCache); this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable); + this.jobManagerTable = Preconditions.checkNotNull(jobManagerTable); + this.jobLeaderService = Preconditions.checkNotNull(jobLeaderService); } // -------------------------------------------------------------------------------------------- @@ -136,6 +142,14 @@ public class TaskManagerServices { return taskSlotTable; } + public JobManagerTable getJobManagerTable() { + return jobManagerTable; + } + + public JobLeaderService getJobLeaderService() { + return jobLeaderService; + } + // -------------------------------------------------------------------------------------------- // Static factory methods for task manager services // -------------------------------------------------------------------------------------------- @@ -190,6 +204,10 @@ public class TaskManagerServices { final TimerService timerService = new TimerService<>(new ScheduledThreadPoolExecutor(1)); final TaskSlotTable taskSlotTable = new TaskSlotTable(resourceProfiles, timerService); + + final JobManagerTable jobManagerTable = new JobManagerTable(); + + final JobLeaderService jobLeaderService = new JobLeaderService(resourceID); return new TaskManagerServices( taskManagerLocation, @@ -200,7 +218,9 @@ public class TaskManagerServices { taskManagerMetricGroup, broadcastVariableManager, fileCache, - taskSlotTable); + taskSlotTable, + jobManagerTable, + jobLeaderService); } /**