flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [FLINK-1432] [runtime] Make memory segment release robust against concurrent modifications of the collection
Date Thu, 12 Feb 2015 11:19:59 GMT
Repository: flink
Updated Branches:
  refs/heads/master 081a7ddda -> fab1bd9dc


[FLINK-1432] [runtime] Make memory segment release robust against concurrent modifications
of the collection


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fab1bd9d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fab1bd9d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fab1bd9d

Branch: refs/heads/master
Commit: fab1bd9dc9ab43196a3d136f63b77d4b1d58b452
Parents: 081a7dd
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Feb 11 23:34:29 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Feb 12 12:19:04 2015 +0100

----------------------------------------------------------------------
 .../memorymanager/DefaultMemoryManager.java     | 107 ++++++++++---------
 1 file changed, 59 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fab1bd9d/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
index c02c417..5f84b23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
@@ -16,23 +16,23 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.memorymanager;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
 
 public class DefaultMemoryManager implements MemoryManager {
 	
@@ -266,8 +266,6 @@ public class DefaultMemoryManager implements MemoryManager {
 
 	@Override
 	public <T extends MemorySegment> void release(Collection<T> segments) {
-		
-		// sanity checks
 		if (segments == null) {
 			return;
 		}
@@ -279,49 +277,62 @@ public class DefaultMemoryManager implements MemoryManager {
 				throw new IllegalStateException("Memory manager has been shut down.");
 			}
 
-			final Iterator<T> segmentsIterator = segments.iterator();
-			
-			AbstractInvokable lastOwner = null;
-			Set<DefaultMemorySegment> segsForOwner = null;
+			// since concurrent modifications to the collection
+			// can disturb the release, we need to try potentially
+			// multiple times
+			boolean successfullyReleased = false;
+			do {
+				final Iterator<T> segmentsIterator = segments.iterator();
+
+				AbstractInvokable lastOwner = null;
+				Set<DefaultMemorySegment> segsForOwner = null;
 
-			// go over all segments
-			while (segmentsIterator.hasNext()) {
-				
-				final MemorySegment seg = segmentsIterator.next();
-				if (seg.isFreed()) {
-					continue;
-				}
-				
-				final DefaultMemorySegment defSeg = (DefaultMemorySegment) seg;
-				final AbstractInvokable owner = defSeg.owner;
-				
 				try {
-					// get the list of segments by this owner only if it is a different owner than for
-					// the previous one (or it is the first segment)
-					if (lastOwner != owner) {
-						lastOwner = owner;
-						segsForOwner = this.allocatedSegments.get(owner);
-					}
-					
-					// remove the segment from the list
-					if (segsForOwner != null) {
-						segsForOwner.remove(defSeg);
-						if (segsForOwner.isEmpty()) {
-							this.allocatedSegments.remove(owner);
+					// go over all segments
+					while (segmentsIterator.hasNext()) {
+
+						final MemorySegment seg = segmentsIterator.next();
+						if (seg == null || seg.isFreed()) {
+							continue;
+						}
+
+						final DefaultMemorySegment defSeg = (DefaultMemorySegment) seg;
+						final AbstractInvokable owner = defSeg.owner;
+
+						try {
+							// get the list of segments by this owner only if it is a different owner than for
+							// the previous one (or it is the first segment)
+							if (lastOwner != owner) {
+								lastOwner = owner;
+								segsForOwner = this.allocatedSegments.get(owner);
+							}
+
+							// remove the segment from the list
+							if (segsForOwner != null) {
+								segsForOwner.remove(defSeg);
+								if (segsForOwner.isEmpty()) {
+									this.allocatedSegments.remove(owner);
+								}
+							}
+						} catch (Throwable t) {
+							LOG.error("Error removing book-keeping reference to allocated memory segment.", t);
+						} finally {
+							// release the memory in any case
+							byte[] buffer = defSeg.destroy();
+							this.freeSegments.add(buffer);
 						}
 					}
+
+					segments.clear();
+
+					// the only way to exit the loop
+					successfullyReleased = true;
 				}
-				catch (Throwable t) {
-					LOG.error("Error removing book-keeping reference to allocated memory segment.", t);
-				}
-				finally {
-					// release the memory in any case
-					byte[] buffer = defSeg.destroy();
-					this.freeSegments.add(buffer);
+				catch (ConcurrentModificationException e) {
+					// this may happen in the case where an asynchronous
+					// call releases the memory. fall through the loop and try again
 				}
-			}
-			
-			segments.clear();
+			} while (!successfullyReleased);
 		}
 		// -------------------- END CRITICAL SECTION -------------------
 	}
@@ -383,7 +394,7 @@ public class DefaultMemoryManager implements MemoryManager {
 	
 	// ------------------------------------------------------------------------
 	
-	private final int getNumPages(long numBytes) {
+	private int getNumPages(long numBytes) {
 		if (numBytes < 0) {
 			throw new IllegalArgumentException("The number of bytes to allocate must not be negative.");
 		}
@@ -392,11 +403,11 @@ public class DefaultMemoryManager implements MemoryManager {
 		if (numPages <= Integer.MAX_VALUE) {
 			return (int) numPages;
 		} else {
-			throw new IllegalArgumentException("The given number of bytes correstponds to more than
MAX_INT pages.");
+			throw new IllegalArgumentException("The given number of bytes corresponds to more than
MAX_INT pages.");
 		}
 	}
 
-	private final int getRelativeNumPages(double fraction){
+	private int getRelativeNumPages(double fraction){
 		if (fraction <= 0 || fraction > 1) {
 			throw new IllegalArgumentException("The fraction of memory to allocate must within (0,
1].");
 		}


Mime
View raw message