flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-986) Add intermediate results to distributed runtime
Date Tue, 09 Dec 2014 18:56:12 GMT

    [ https://issues.apache.org/jira/browse/FLINK-986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14239838#comment-14239838
] 

ASF GitHub Bot commented on FLINK-986:
--------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/254#discussion_r21552080
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
---
    @@ -111,111 +89,176 @@
     	/** The I/O manager of the current environment (currently the one associated with the
executing TaskManager). */
     	private final IOManager ioManager;
     
    -	/** The input split provider that can be queried for new input splits.  */
    +	/** The input split provider that can be queried for new input splits. */
     	private final InputSplitProvider inputSplitProvider;
     
    -	
     	/** The thread executing the task in the environment. */
     	private Thread executingThread;
     
    -	/**
    -	 * The RPC proxy to report accumulators to JobManager
    -	 */
    +	/** The RPC proxy to report accumulators to JobManager. */
     	private final AccumulatorProtocol accumulatorProtocolProxy;
     
    -	private final Map<String,FutureTask<Path>> cacheCopyTasks = new HashMap<String,
FutureTask<Path>>();
    -	
    -	private final BroadcastVariableManager bcVarManager;
    -	
    -	private LocalBufferPool outputBufferPool;
    -	
    +	/** The network environment of the task manager */
    +	private final NetworkEnvironment networkEnvironment;
    +
    +	private final Map<String, FutureTask<Path>> cacheCopyTasks = new HashMap<String,
FutureTask<Path>>();
    +
    +	private final IntermediateResultPartitionManager partitionManager;
    +
    +	private final TaskEventDispatcher taskEventDispatcher;
    +
     	private AtomicBoolean canceled = new AtomicBoolean();
     
    +	private BufferWriter[] writers;
    +
    +	private BufferReader[] readers;
    +
    +	private Map<IntermediateDataSetID, BufferReader> readersById = new HashMap<IntermediateDataSetID,
BufferReader>();
    +
    +	private IntermediateResultPartition[] producedPartitions;
    +
    +	public RuntimeEnvironment(
    +			Task owner, TaskDeploymentDescriptor tdd, ClassLoader userCodeClassLoader,
    +			MemoryManager memoryManager, IOManager ioManager, InputSplitProvider inputSplitProvider,
    +			AccumulatorProtocol accumulatorProtocolProxy, NetworkEnvironment networkEnvironment)
throws Exception {
    +
    +		this.owner = checkNotNull(owner);
    +
    +		this.memoryManager = checkNotNull(memoryManager);
    +		this.ioManager = checkNotNull(ioManager);
    +		this.inputSplitProvider = checkNotNull(inputSplitProvider);
    +		this.accumulatorProtocolProxy = checkNotNull(accumulatorProtocolProxy);
    +		this.partitionManager = checkNotNull(networkEnvironment.getPartitionManager());
    +		this.taskEventDispatcher = checkNotNull(networkEnvironment.getTaskEventDispatcher());
    +
    +		this.networkEnvironment = checkNotNull(networkEnvironment);
    +
    +		boolean success = false;
     
    -	public RuntimeEnvironment(Task owner, TaskDeploymentDescriptor tdd,
    -							ClassLoader userCodeClassLoader,
    -							MemoryManager memoryManager, IOManager ioManager,
    -							InputSplitProvider inputSplitProvider,
    -							AccumulatorProtocol accumulatorProtocolProxy,
    -							BroadcastVariableManager bcVarManager)
    -		throws Exception
    -	{
    -		Preconditions.checkNotNull(owner);
    -		Preconditions.checkNotNull(memoryManager);
    -		Preconditions.checkNotNull(ioManager);
    -		Preconditions.checkNotNull(inputSplitProvider);
    -		Preconditions.checkNotNull(accumulatorProtocolProxy);
    -		Preconditions.checkNotNull(userCodeClassLoader);
    -		Preconditions.checkNotNull(bcVarManager);
    -		
    -		this.owner = owner;
    -
    -		this.memoryManager = memoryManager;
    -		this.ioManager = ioManager;
    -		this.inputSplitProvider = inputSplitProvider;
    -		this.accumulatorProtocolProxy = accumulatorProtocolProxy;
    -		this.bcVarManager = bcVarManager;
    -
    -		// load and instantiate the invokable class
    -		this.userCodeClassLoader = userCodeClassLoader;
    -		try {
    -			final String className = tdd.getInvokableClassName();
    -			this.invokableClass = Class.forName(className, true, userCodeClassLoader).asSubclass(AbstractInvokable.class);
    -		}
    -		catch (Throwable t) {
    -			throw new Exception("Could not load invokable class.", t);
    -		}
    -		
     		try {
    -			this.invokable = this.invokableClass.newInstance();
    -		}
    -		catch (Throwable t) {
    -			throw new Exception("Could not instantiate the invokable class.", t);
    -		}
    -		
    -		this.jobConfiguration = tdd.getJobConfiguration();
    -		this.taskConfiguration = tdd.getTaskConfiguration();
    -		
    -		this.invokable.setEnvironment(this);
    -		
    -		// make sure that user classloader is available, because registerInputOutput might
call usercode
    -		{
    -			Thread currentThread = Thread.currentThread();
    -			ClassLoader context = currentThread.getContextClassLoader();
    -			currentThread.setContextClassLoader(userCodeClassLoader);
    +			// ----------------------------------------------------------------
    +			// Produced intermediate result partitions and writers
    +			// ----------------------------------------------------------------
    +
    +			List<IntermediateResultPartitionDeploymentDescriptor> irpdd = tdd.getProducedPartitions();
    +
    +			this.producedPartitions = new IntermediateResultPartition[irpdd.size()];
    +			this.writers = new BufferWriter[irpdd.size()];
    +
    +			for (int i = 0; i < producedPartitions.length; i++) {
    +				IntermediateResultPartitionDeploymentDescriptor irp = irpdd.get(i);
    +
    +				producedPartitions[i] = new IntermediateResultPartition(i, getJobID(), tdd.getExecutionId(),
networkEnvironment, irp);
    +
    +				writers[i] = new BufferWriter(producedPartitions[i]);
    +
    +				partitionManager.registerIntermediateResultPartition(producedPartitions[i]);
    +			}
    +
    +			registerAllWritersWithTaskEventDispatcher();
    +
    +			// ----------------------------------------------------------------
    +			// Consumed intermediate result partition readers
    +			// ----------------------------------------------------------------
    +
    +			List<IntermediateResultPartitionConsumerDeploymentDescriptor> consumedPartitions
= tdd.getConsumedPartitions();
    +
    +			this.readers = new BufferReader[consumedPartitions.size()];
    +
    +			for (int i = 0; i < consumedPartitions.size(); i++) {
    +				IntermediateResultPartitionConsumerDeploymentDescriptor cdd = consumedPartitions.get(i);
    +
    +				// There is one input channel per partition of the consumed
    +				// result. Each of these input channels consumes the queue
    +				// matching the index of this subtask (or queue 0 for
    +				// forwarding distribution patterns). In cases where the number
    +				// of queues and subtasks does not match, there needs to be
    +				// a separate repartitioning task.
    +
    +				IntermediateResultPartitionInfo[] partitions = cdd.getPartitions();
    +				InputChannel[] inputChannels = new InputChannel[partitions.length];
    +
    +				IntermediateDataSetID resultId = cdd.getResultId();
    +				int queueToRequest = cdd.getQueueToRequest();
    +
    +				BufferReader reader = new BufferReader(resultId, owner, networkEnvironment, inputChannels.length,
queueToRequest);
    +				readersById.put(resultId, reader);
    +
    +				for (int j = 0; j < partitions.length; j++) {
    +					IntermediateResultPartitionInfo partitionInfo = partitions[j];
    +					IntermediateResultPartitionID partitionId = partitionInfo.getPartitionId();
    +
    +					IntermediateResultPartitionLocation location = partitionInfo.getProducerLocation();
    +
    +					switch (location) {
    +						case LOCAL:
    +							inputChannels[j] = new LocalInputChannel(j, partitionId, reader);
    +							break;
    +						case REMOTE:
    +							inputChannels[j] = new RemoteInputChannel(j, partitionId, reader, partitionInfo.getProducerAddress());
    +							break;
    +						case UNKNOWN:
    +							inputChannels[j] = new UnknownInputChannel(j, partitionId, reader);
    +							break;
    +					}
    +
    +					reader.setInputChannel(partitionId, inputChannels[j]);
    +				}
    +
    +				readers[i] = reader;
    +			}
    +
    +			// ----------------------------------------------------------------
    +			// Invokable setup
    +			// ----------------------------------------------------------------
    +			// Note: This has to be done *after* the readers and writers have
    +			// been setup, because the invokable relies on them for I/O.
    +			// ----------------------------------------------------------------
    +
    +			// load and instantiate the invokable class
    +			this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
    +			/* Class of the task to run in this environment. */
    +			Class<? extends AbstractInvokable> invokableClass;
     			try {
    -				this.invokable.registerInputOutput();
    +				final String className = tdd.getInvokableClassName();
    +				invokableClass = Class.forName(className, true, userCodeClassLoader).asSubclass(AbstractInvokable.class);
     			}
    -			finally {
    -				currentThread.setContextClassLoader(context);
    +			catch (Throwable t) {
    +				throw new Exception("Could not load invokable class.", t);
     			}
    -		}
     
    -		List<GateDeploymentDescriptor> inGates = tdd.getInputGates();
    -		List<GateDeploymentDescriptor> outGates = tdd.getOutputGates();
    -		
    -		
    -		if (this.inputGates.size() != inGates.size()) {
    -			throw new Exception("The number of readers created in 'registerInputOutput()' "
    -					+ "is different than the number of connected incoming edges in the job graph.");
    -		}
    -		if (this.outputGates.size() != outGates.size()) {
    -			throw new Exception("The number of writers created in 'registerInputOutput()' "
    -					+ "is different than the number of connected outgoing edges in the job graph.");
    +			try {
    +				this.invokable = invokableClass.newInstance();
    +			}
    +			catch (Throwable t) {
    +				throw new Exception("Could not instantiate the invokable class.", t);
    +			}
    +
    +			this.jobConfiguration = tdd.getJobConfiguration();
    +			this.taskConfiguration = tdd.getTaskConfiguration();
    +
    +			this.invokable.setEnvironment(this);
    +			this.invokable.registerInputOutput();
    +
    +			success = true;
     		}
    -		
    -		for (int i = 0; i < inGates.size(); i++) {
    -			this.inputGates.get(i).initializeChannels(inGates.get(i));
    +		catch (Throwable t) {
    +			LOG.error(ExceptionUtils.stringifyException(t), t);
    --- End diff --
    
    I think that the `stringifyException` call is unnecessary and even hindering proper logging.
I would remove it, construct a proper error message for the log and pass the exception as
the second parameter.


> Add intermediate results to distributed runtime
> -----------------------------------------------
>
>                 Key: FLINK-986
>                 URL: https://issues.apache.org/jira/browse/FLINK-986
>             Project: Flink
>          Issue Type: New Feature
>          Components: Distributed Runtime
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>            Priority: Blocker
>
> Support for intermediate results in the runtime is currently blocking different efforts
like fault tolerance or result collection at the client.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message