flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4657) Implement HighAvailabilityServices based on zookeeper
Date Tue, 27 Sep 2016 17:03:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15526751#comment-15526751
] 

ASF GitHub Bot commented on FLINK-4657:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2550#discussion_r80740850
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
---
    @@ -467,6 +487,128 @@ public void registerAtResourceManager(final String address) {
     		//TODO:: register at the RM
     	}
     
    +	@RpcMethod
    +	public NextInputSplit requestNextInputSplit(final JobVertexID vertexID, final ExecutionAttemptID
executionAttempt) {
    +		final byte[] serializedInputSplit;
    +
    +		final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
    +		if (execution == null) {
    +			log.error("Can not find Execution for attempt {}.", executionAttempt);
    +			return null;
    +		} else {
    +			final Slot slot = execution.getAssignedResource();
    +			final int taskId = execution.getVertex().getParallelSubtaskIndex();
    +			final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null;
    +
    +			final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID);
    +			if (vertex != null) {
    +				final InputSplitAssigner splitAssigner = vertex.getSplitAssigner();
    +				if (splitAssigner != null) {
    +					final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId);
    +
    +					log.debug("Send next input split {}.", nextInputSplit);
    +					try {
    +						serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit);
    +					} catch (Exception ex) {
    +						log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(),
ex);
    +						vertex.fail(new RuntimeException("Could not serialize the next input split of class
" +
    +							nextInputSplit.getClass() + ".", ex));
    +						return null;
    +					}
    +				} else {
    +					log.error("No InputSplitAssigner for vertex ID {}.", vertexID);
    +					return null;
    +				}
    +			} else {
    +				log.error("Cannot find execution vertex for vertex ID {}.", vertexID);
    +				return null;
    +			}
    +		}
    +		return new NextInputSplit(serializedInputSplit);
    +	}
    +
    +	@RpcMethod
    +	public PartitionState requestPartitionState(
    +		final ResultPartitionID partitionId,
    +		final ExecutionAttemptID taskExecutionId,
    +		final IntermediateDataSetID taskResultId)
    +	{
    +		final Execution execution = executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
    +		final ExecutionState state = execution != null ? execution.getState() : null;
    +		return new PartitionState(taskExecutionId, taskResultId, partitionId.getPartitionId(),
state);
    +	}
    +
    +	@RpcMethod
    +	public void jobStatusChanged(final JobStatus newJobStatus, long timestamp, final Throwable
error) {
    +		final JobID jobID = executionGraph.getJobID();
    +		final String jobName = executionGraph.getJobName();
    +		log.info("Status of job {} ({}) changed to {}.", jobID, jobName, newJobStatus, error);
    +
    +		if (newJobStatus.isGloballyTerminalState()) {
    +			// TODO set job end time in JobInfo
    +
    +			/*
    +			  TODO
    +			  if (jobInfo.sessionAlive) {
    +                jobInfo.setLastActive()
    +                val lastActivity = jobInfo.lastActive
    +                context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds)
{
    +                  // remove only if no activity occurred in the meantime
    +                  if (lastActivity == jobInfo.lastActive) {
    +                    self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend
= true))
    +                  }
    +                }(context.dispatcher)
    +              } else {
    +                self ! decorateMessage(RemoveJob(jobID, removeJobFromStateBackend = true))
    +              }
    +			 */
    +
    +			if (newJobStatus == JobStatus.FINISHED) {
    +				try {
    +					final Map<String, SerializedValue<Object>> accumulatorResults =
    +						executionGraph.getAccumulatorsSerialized();
    +					final SerializedJobExecutionResult result = new SerializedJobExecutionResult(
    +						jobID, 0, accumulatorResults // TODO get correct job duration
    +					);
    +					jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader));
    +				} catch (Exception e) {
    +					log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e);
    +					final JobExecutionException exception = new JobExecutionException(
    +						jobID, "Failed to retrieve accumulator results.", e);
    +					// TODO should we also notify client?
    +					jobCompletionActions.jobFailed(exception);
    +				}
    +			}
    +			else if (newJobStatus == JobStatus.CANCELED) {
    +				final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
    +				final JobExecutionException exception = new JobExecutionException(
    +					jobID, "Job was cancelled.", unpackedError);
    +				// TODO should we also notify client?
    +				jobCompletionActions.jobFailed(exception);
    +			}
    +			else if (newJobStatus == JobStatus.FAILED) {
    +				final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
    +				final JobExecutionException exception = new JobExecutionException(
    +					jobID, "Job execution failed.", unpackedError);
    +				// TODO should we also notify client?
    +				jobCompletionActions.jobFailed(exception);
    +			}
    +			else {
    +				final JobExecutionException exception = new JobExecutionException(
    +					jobID, newJobStatus + " is not a terminal state.");
    +				// TODO should we also notify client?
    +				jobCompletionActions.jobFailed(exception);
    +				throw new RuntimeException(exception);
    +			}
    +		}
    +	}
    +
    +	@RpcMethod
    +	public void scheduleOrUpdateConsumers(final ResultPartitionID partitionID) {
    --- End diff --
    
    Is that method needed as an RPC method? Who would call it?


> Implement HighAvailabilityServices based on zookeeper
> -----------------------------------------------------
>
>                 Key: FLINK-4657
>                 URL: https://issues.apache.org/jira/browse/FLINK-4657
>             Project: Flink
>          Issue Type: New Feature
>          Components: Cluster Management
>            Reporter: Kurt Young
>            Assignee: Kurt Young
>
> For flip-6, we will have ResourceManager and every JobManager as potential leader contender
and retriever. We should separate them by using different zookeeper path. 
> For example, the path could be /leader/resource-manaeger for RM. And for each JM, the
path could be /leader/job-managers/JobID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message