flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4615) Reusing the memory allocated for the drivers and iterators
Date Thu, 15 Sep 2016 16:32:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15493841#comment-15493841
] 

ASF GitHub Bot commented on FLINK-4615:
---------------------------------------

Github user ramkrish86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2496#discussion_r79006658
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
---
    @@ -82,43 +82,46 @@ protected void initialize() throws Exception {
     	public void run() throws Exception {
     		
     		SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
    +		try {
    +			while (this.running && !terminationRequested()) {
     
    -		while (this.running && !terminationRequested()) {
    +				if (log.isInfoEnabled()) {
    +					log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
    +				}
     
    -			if (log.isInfoEnabled()) {
    -				log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
    -			}
    +				super.run();
     
    -			super.run();
    +				// check if termination was requested
    +				verifyEndOfSuperstepState();
     
    -			// check if termination was requested
    -			verifyEndOfSuperstepState();
    +				if (isWorksetUpdate && isWorksetIteration) {
    +					long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
    +					worksetAggregator.aggregate(numCollected);
    +				}
     
    -			if (isWorksetUpdate && isWorksetIteration) {
    -				long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
    -				worksetAggregator.aggregate(numCollected);
    -			}
    -			
    -			if (log.isInfoEnabled()) {
    -				log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
    -			}
    -			
    -			// let the successors know that the end of this superstep data is reached
    -			sendEndOfSuperstep();
    -			
    -			if (isWorksetUpdate) {
    -				// notify iteration head if responsible for workset update
    -				worksetBackChannel.notifyOfEndOfSuperstep();
    -			}
    -			
    -			boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration()
+ 1);
    +				if (log.isInfoEnabled()) {
    +					log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
    +				}
     
    -			if (terminated) {
    -				requestTermination();
    -			}
    -			else {
    -				incrementIterationCounter();
    +				// let the successors know that the end of this superstep data is reached
    +				sendEndOfSuperstep();
    +
    +				if (isWorksetUpdate) {
    +					// notify iteration head if responsible for workset update
    +					worksetBackChannel.notifyOfEndOfSuperstep();
    +				}
    +
    +				boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration()
+ 1);
    +
    +				if (terminated) {
    +					requestTermination();
    +					this.driver.cleanup();
    --- End diff --
    
    Am very sorry. That was a mistake. An oversight. I forgot to remove the cleanup() called
after termination.


> Reusing the memory allocated for the drivers and iterators
> ----------------------------------------------------------
>
>                 Key: FLINK-4615
>                 URL: https://issues.apache.org/jira/browse/FLINK-4615
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Local Runtime
>            Reporter: ramkrishna.s.vasudevan
>            Assignee: ramkrishna.s.vasudevan
>             Fix For: 1.0.0
>
>
> Raising as a subtask so that individually can be committed and for better closer reviews.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message