flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #2594: [FLINK-4738] [TaskManager] Port TaskManager logic ...
Date Mon, 10 Oct 2016 08:42:59 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2594#discussion_r82563262
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
---
    @@ -127,12 +195,423 @@ public void start() {
     		}
     	}
     
    +	/**
    +	 * Called to shut down the TaskManager. The method closes all TaskManager services.
    +	 */
    +	@Override
    +	public void shutDown() {
    +		log.info("Stopping TaskManager {}.", getAddress());
    +
    +		if (resourceManagerConnection.isConnected()) {
    +			try {
    +				resourceManagerConnection.close();
    +			} catch (Exception e) {
    +				log.error("Could not cleanly close the ResourceManager connection.", e);
    +			}
    +		}
    +
    +		try {
    +			ioManager.shutdown();
    +		} catch (Exception e) {
    +			log.error("IOManager did not shut down properly.", e);
    +		}
    +
    +		try {
    +			memoryManager.shutdown();
    +		} catch (Exception e) {
    +			log.error("MemoryManager did not shut down properly.", e);
    +		}
    +
    +		try {
    +			networkEnvironment.shutdown();
    +		} catch (Exception e) {
    +			log.error("Network environment did not shut down properly.", e);
    +		}
    +
    +		try {
    +			fileCache.shutdown();
    +		} catch (Exception e) {
    +			log.error("File cache did not shut down properly.", e);
    +		}
    +
    +		try {
    +			metricRegistry.shutdown();
    +		} catch (Exception e) {
    +			log.error("MetricRegistry did not shut down properly.", e);
    +		}
    +
    +		log.info("Stopped TaskManager {}.", getAddress());
    +	}
    +
    +	// ========================================================================
    +	//  RPC methods
    +	// ========================================================================
    +
    +	// ----------------------------------------------------------------------
    +	// Task lifecycle RPCs
    +	// ----------------------------------------------------------------------
    +
    +	@RpcMethod
    +	public Acknowledge submitTask(TaskDeploymentDescriptor tdd, ResourceID jobManagerID)
throws TaskSubmissionException {
    +
    +		JobManagerConnection jobManagerConnection = getJobManagerConnection(jobManagerID);
    +
    +		if (jobManagerConnection == null) {
    +			final String message = "Could not submit task because JobManager " + jobManagerID
+
    +				" was not associated.";
    +
    +			log.debug(message);
    +			throw new TaskSubmissionException(message);
    +		}
    +
    +		TaskSlot taskSlot = taskSlots.get(tdd.getAllocationID());
    +
    +		if (taskSlot == null) {
    +			final String message = "No task slot allocated for allocation ID " + tdd.getAllocationID()
+ '.';
    +			log.debug(message);
    +			throw new TaskSubmissionException(message);
    +		}
    +
    +		TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(tdd);
    +
    +		InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(
    +			jobManagerConnection.getJobManagerGateway(),
    +			tdd.getJobID(),
    +			tdd.getVertexID(),
    +			tdd.getExecutionId(),
    +			taskManagerConfiguration.getTimeout());
    +
    +		TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions();
    +		CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder();
    +		LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager();
    +		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier();
    +		PartitionStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();
    +
    +		Task task = new Task(
    +			tdd,
    +			memoryManager,
    +			ioManager,
    +			networkEnvironment,
    +			broadcastVariableManager,
    +			taskManagerActions,
    +			inputSplitProvider,
    +			checkpointResponder,
    +			libraryCache,
    +			fileCache,
    +			taskManagerRuntimeInfo,
    +			taskMetricGroup,
    +			resultPartitionConsumableNotifier,
    +			partitionStateChecker,
    +			getRpcService().getExecutor());
    +
    +		log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks());
    +
    +		if(taskSlot.add(task)) {
    +			TaskSlotMapping taskSlotMapping = new TaskSlotMapping(task, taskSlot);
    +
    +			taskSlotMappings.put(task.getExecutionId(), taskSlotMapping);
    +			task.startTaskThread();
    +
    +			return Acknowledge.get();
    +		} else {
    +			final String message = "TaskManager already contains a task for id " +
    +				task.getExecutionId() + '.';
    +
    +			log.debug(message);
    +			throw new TaskSubmissionException(message);
    +		}
    +	}
    +
    +	@RpcMethod
    +	public Acknowledge cancelTask(ExecutionAttemptID executionAttemptID) throws TaskException
{
    +		final Task task = getTask(executionAttemptID);
    +
    +		if (task != null) {
    +			try {
    +				task.cancelExecution();
    +				return Acknowledge.get();
    +			} catch (Throwable t) {
    +				throw new TaskException("Cannot cancel task for execution " + executionAttemptID
+ '.', t);
    +			}
    +		} else {
    +			final String message = "Cannot find task to stop for execution " + executionAttemptID
+ '.';
    +
    +			log.debug(message);
    +			throw new TaskException(message);
    +		}
    +	}
    +
    +	@RpcMethod
    +	public Acknowledge stopTask(ExecutionAttemptID executionAttemptID) throws TaskException
{
    +		final Task task = getTask(executionAttemptID);
    +
    +		if (task != null) {
    +			try {
    +				task.stopExecution();
    +				return Acknowledge.get();
    +			} catch (Throwable t) {
    +				throw new TaskException("Cannot stop task for execution " + executionAttemptID +
'.', t);
    +			}
    +		} else {
    +			final String message = "Cannot find task to stop for execution " + executionAttemptID
+ '.';
    +
    +			log.debug(message);
    +			throw new TaskException(message);
    +		}
    +	}
    +
    +	// ----------------------------------------------------------------------
    +	// Partition lifecycle RPCs
    +	// ----------------------------------------------------------------------
    +
    +	@RpcMethod
    +	public Acknowledge updatePartitions(final ExecutionAttemptID executionAttemptID, Collection<PartitionInfo>
partitionInfos) throws PartitionException {
    +		final Task task = getTask(executionAttemptID);
    +
    +		if (task != null) {
    +			for (final PartitionInfo partitionInfo: partitionInfos) {
    +				IntermediateDataSetID intermediateResultPartitionID = partitionInfo.getIntermediateDataSetID();
    +
    +				final SingleInputGate singleInputGate = task.getInputGateById(intermediateResultPartitionID);
    +
    +				if (singleInputGate != null) {
    +					// Run asynchronously because it might be blocking
    +					getRpcService().execute(new Runnable() {
    +						@Override
    +						public void run() {
    +							try {
    +								singleInputGate.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor());
    +							} catch (IOException | InterruptedException e) {
    +								log.error("Could not update input data location for task {}. Trying to fail task.",
task.getTaskInfo().getTaskName(), e);
    +
    +								try {
    +									task.failExternally(e);
    +								} catch (RuntimeException re) {
    +									// TODO: Check whether we need this or make exception in failExtenally checked
    +									log.error("Failed canceling task with execution ID {} after task update failure.",
executionAttemptID, re);
    +								}
    +							}
    +						}
    +					});
    +				} else {
    +					throw new PartitionException("No reader with ID " +
    +						intermediateResultPartitionID + " for task " + executionAttemptID +
    +						" was found.");
    +				}
    +			}
    +
    +			return Acknowledge.get();
    +		} else {
    +			log.debug("Discard update for input partitions of task {}. Task is no longer running.",
executionAttemptID);
    +			return Acknowledge.get();
    +		}
    +	}
    +
    +	@RpcMethod
    +	public void failPartition(ExecutionAttemptID executionAttemptID) {
    +		log.info("Discarding the results produced by task execution {}.", executionAttemptID);
    +
    +		try {
    +			networkEnvironment.getResultPartitionManager().releasePartitionsProducedBy(executionAttemptID);
    +		} catch (Throwable t) {
    +			// TODO: Do we still need this catch branch?
    +			onFatalError(t);
    +		}
    +
    +		// TODO: Maybe it's better to return an Acknowledge here to notify the JM about the
success/failure with an Exception
    +	}
    +
    +	// ----------------------------------------------------------------------
    +	// Checkpointing RPCs
    +	// ----------------------------------------------------------------------
    +
    +	@RpcMethod
    +	public Acknowledge triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId,
long checkpointTimestamp) throws CheckpointException {
    +		log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
    +
    +		final Task task = getTask(executionAttemptID);
    +
    +		if (task != null) {
    +			task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp);
    +
    +			return Acknowledge.get();
    +		} else {
    +			final String message = "TaskManager received a checkpoint request for unknown task
" + executionAttemptID + '.';
    +
    +			log.debug(message);
    +			throw new CheckpointException(message);
    +		}
    +	}
    +
    --- End diff --
    
    Good catch. Will fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message