flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ggevay <...@git.apache.org>
Subject [GitHub] flink pull request #2496: FLINK-4615 Reusing the memory allocated for the dr...
Date Thu, 15 Sep 2016 16:18:30 GMT
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2496#discussion_r79003843
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
---
    @@ -82,43 +82,46 @@ protected void initialize() throws Exception {
     	public void run() throws Exception {
     		
     		SuperstepKickoffLatch nextSuperstepLatch = SuperstepKickoffLatchBroker.instance().get(brokerKey());
    +		try {
    +			while (this.running && !terminationRequested()) {
     
    -		while (this.running && !terminationRequested()) {
    +				if (log.isInfoEnabled()) {
    +					log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
    +				}
     
    -			if (log.isInfoEnabled()) {
    -				log.info(formatLogString("starting iteration [" + currentIteration() + "]"));
    -			}
    +				super.run();
     
    -			super.run();
    +				// check if termination was requested
    +				verifyEndOfSuperstepState();
     
    -			// check if termination was requested
    -			verifyEndOfSuperstepState();
    +				if (isWorksetUpdate && isWorksetIteration) {
    +					long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
    +					worksetAggregator.aggregate(numCollected);
    +				}
     
    -			if (isWorksetUpdate && isWorksetIteration) {
    -				long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
    -				worksetAggregator.aggregate(numCollected);
    -			}
    -			
    -			if (log.isInfoEnabled()) {
    -				log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
    -			}
    -			
    -			// let the successors know that the end of this superstep data is reached
    -			sendEndOfSuperstep();
    -			
    -			if (isWorksetUpdate) {
    -				// notify iteration head if responsible for workset update
    -				worksetBackChannel.notifyOfEndOfSuperstep();
    -			}
    -			
    -			boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration()
+ 1);
    +				if (log.isInfoEnabled()) {
    +					log.info(formatLogString("finishing iteration [" + currentIteration() + "]"));
    +				}
     
    -			if (terminated) {
    -				requestTermination();
    -			}
    -			else {
    -				incrementIterationCounter();
    +				// let the successors know that the end of this superstep data is reached
    +				sendEndOfSuperstep();
    +
    +				if (isWorksetUpdate) {
    +					// notify iteration head if responsible for workset update
    +					worksetBackChannel.notifyOfEndOfSuperstep();
    +				}
    +
    +				boolean terminated = nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration()
+ 1);
    +
    +				if (terminated) {
    +					requestTermination();
    +					this.driver.cleanup();
    --- End diff --
    
    `cleanup` will be called twice at the last step of the iteration, because it is also called
in the `finally` block. This might potentially cause problems if one of the drivers assumes
that `cleanup` will only be called once.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message