flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [9/9] flink git commit: [FLINK-6823] Activate checkstyle for runtime/broadcast
Date Tue, 04 Jul 2017 17:10:27 GMT
[FLINK-6823] Activate checkstyle for runtime/broadcast

This closes #4068.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d18afed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d18afed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d18afed

Branch: refs/heads/master
Commit: 4d18afed80774c57766752f8a3cc45133a4a457a
Parents: bd85884
Author: zentol <chesnay@apache.org>
Authored: Fri Jun 2 22:18:49 2017 +0200
Committer: zentol <chesnay@apache.org>
Committed: Tue Jul 4 16:08:03 2017 +0200

----------------------------------------------------------------------
 flink-runtime/pom.xml                           |  1 -
 .../runtime/broadcast/BroadcastVariableKey.java | 24 +++--
 .../broadcast/BroadcastVariableManager.java     | 58 ++++++------
 .../BroadcastVariableMaterialization.java       | 97 ++++++++++----------
 .../DefaultBroadcastVariableInitializer.java    | 17 ++--
 .../InitializationTypeConflictException.java    | 13 ++-
 .../MaterializationExpiredException.java        |  4 +
 7 files changed, 119 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4d18afed/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index a21f753..15b502e 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -428,7 +428,6 @@ under the License.
 					<excludes>
 						**/runtime/akka/**,
 						**/runtime/blob/**,
-						**/runtime/broadcast/**,
 						**/runtime/checkpoint/**,
 						**/runtime/client/**,
 						**/runtime/clusterframework/**,       

http://git-wip-us.apache.org/repos/asf/flink/blob/4d18afed/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableKey.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableKey.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableKey.java
index cfa87ae..372519e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableKey.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableKey.java
@@ -20,47 +20,51 @@ package org.apache.flink.runtime.broadcast;
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
+/**
+ * An identifier for a {@link BroadcastVariableMaterialization} based on the task's {@link
JobVertexID}, broadcast
+ * variable name and iteration superstep.
+ */
 public class BroadcastVariableKey {
 
 	private final JobVertexID vertexId;
-	
+
 	private final String name;
-	
+
 	private final int superstep;
 
 	public BroadcastVariableKey(JobVertexID vertexId, String name, int superstep) {
 		if (vertexId == null || name == null || superstep <= 0) {
 			throw new IllegalArgumentException();
 		}
-		
+
 		this.vertexId = vertexId;
 		this.name = name;
 		this.superstep = superstep;
 	}
 
 	// ---------------------------------------------------------------------------------------------
-	
+
 	public JobVertexID getVertexId() {
 		return vertexId;
 	}
-	
+
 	public String getName() {
 		return name;
 	}
-	
+
 	public int getSuperstep() {
 		return superstep;
 	}
-	
+
 	// ---------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public int hashCode() {
 		return 31 * superstep +
 				47 * name.hashCode() +
 				83 * vertexId.hashCode();
 	}
-	
+
 	@Override
 	public boolean equals(Object obj) {
 		if (obj != null && obj.getClass() == BroadcastVariableKey.class) {
@@ -73,7 +77,7 @@ public class BroadcastVariableKey {
 			return false;
 		}
 	}
-	
+
 	@Override
 	public String toString() {
 		return vertexId + " \"" + name + "\" (" + superstep + ')';

http://git-wip-us.apache.org/repos/asf/flink/blob/4d18afed/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
index 7d0454e..0cee70e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
@@ -18,34 +18,42 @@
 
 package org.apache.flink.runtime.broadcast;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
 import org.apache.flink.runtime.operators.BatchTask;
 
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * The BroadcastVariableManager is used to manage the materialization of broadcast variables.
References to materialized
+ * broadcast variables are cached and shared between parallel subtasks. A reference count
is maintained to track whether
+ * the materialization may be cleaned up.
+ */
 public class BroadcastVariableManager {
-	
+
 	private final ConcurrentHashMap<BroadcastVariableKey, BroadcastVariableMaterialization<?,
?>> variables =
 							new ConcurrentHashMap<BroadcastVariableKey, BroadcastVariableMaterialization<?,
?>>(16);
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
+	/**
+	 * Materializes the broadcast variable for the given name, scoped to the given task and
its iteration superstep. An
+	 * existing materialization created by another parallel subtask may be returned, if it hasn't
expired yet.
+	 */
 	public <T> BroadcastVariableMaterialization<T, ?> materializeBroadcastVariable(String
name, int superstep, BatchTask<?, ?> holder,
-			MutableReader<?> reader, TypeSerializerFactory<T> serializerFactory) throws
IOException
-	{
+			MutableReader<?> reader, TypeSerializerFactory<T> serializerFactory) throws
IOException {
 		final BroadcastVariableKey key = new BroadcastVariableKey(holder.getEnvironment().getJobVertexId(),
name, superstep);
-		
+
 		while (true) {
 			final BroadcastVariableMaterialization<T, Object> newMat = new BroadcastVariableMaterialization<T,
Object>(key);
-			
+
 			final BroadcastVariableMaterialization<?, ?> previous = variables.putIfAbsent(key,
newMat);
-			
+
 			@SuppressWarnings("unchecked")
 			final BroadcastVariableMaterialization<T, ?> materialization = (previous == null)
? newMat : (BroadcastVariableMaterialization<T, ?>) previous;
-			
+
 			try {
 				materialization.materializeVariable(reader, serializerFactory, holder);
 				return materialization;
@@ -54,13 +62,13 @@ public class BroadcastVariableManager {
 				// concurrent release. as an optimization, try to replace the previous one with our version.
otherwise we might spin for a while
 				// until the releaser removes the variable
 				// NOTE: This would also catch a bug prevented an expired materialization from ever being
removed, so it acts as a future safeguard
-				
+
 				boolean replaceSuccessful = false;
 				try {
 					replaceSuccessful = variables.replace(key, materialization, newMat);
 				}
 				catch (Throwable t) {}
-				
+
 				if (replaceSuccessful) {
 					try {
 						newMat.materializeVariable(reader, serializerFactory, holder);
@@ -75,29 +83,27 @@ public class BroadcastVariableManager {
 			}
 		}
 	}
-	
-	
+
 	public void releaseReference(String name, int superstep, BatchTask<?, ?> referenceHolder)
{
 		BroadcastVariableKey key = new BroadcastVariableKey(referenceHolder.getEnvironment().getJobVertexId(),
name, superstep);
 		releaseReference(key, referenceHolder);
 	}
-	
+
 	public void releaseReference(BroadcastVariableKey key, BatchTask<?, ?> referenceHolder)
{
 		BroadcastVariableMaterialization<?, ?> mat = variables.get(key);
-		
+
 		// release this reference
 		if (mat.decrementReference(referenceHolder)) {
 			// remove if no one holds a reference and no one concurrently replaced the entry
 			variables.remove(key, mat);
 		}
 	}
-	
-	
+
 	public void releaseAllReferencesFromTask(BatchTask<?, ?> referenceHolder) {
-		// go through all registered variables 
+		// go through all registered variables
 		for (Map.Entry<BroadcastVariableKey, BroadcastVariableMaterialization<?, ?>>
entry : variables.entrySet()) {
 			BroadcastVariableMaterialization<?, ?> mat = entry.getValue();
-			
+
 			// release the reference
 			if (mat.decrementReferenceIfHeld(referenceHolder)) {
 				// remove if no one holds a reference and no one concurrently replaced the entry
@@ -105,10 +111,10 @@ public class BroadcastVariableManager {
 			}
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	public int getNumberOfVariablesWithReferences() {
 		return this.variables.size();
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d18afed/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
index cea32e5..9c2c109 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
@@ -18,12 +18,6 @@
 
 package org.apache.flink.runtime.broadcast;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
@@ -32,54 +26,61 @@ import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.util.ReaderIterator;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 /**
+ * This class represents a single materialization of a broadcast variable and maintains a
reference count for it. If the
+ * reference count reaches zero the variable is no longer accessible and will eventually
be garbage-collected.
+ *
  * @param <T> The type of the elements in the broadcasted data set.
  */
 public class BroadcastVariableMaterialization<T, C> {
-	
+
 	private static final Logger LOG = LoggerFactory.getLogger(BroadcastVariableMaterialization.class);
-	
-	
-	private final Set<BatchTask<?, ?>> references = new HashSet<BatchTask<?,?>>();
-	
+
+	private final Set<BatchTask<?, ?>> references = new HashSet<BatchTask<?,
?>>();
+
 	private final Object materializationMonitor = new Object();
-	
+
 	private final BroadcastVariableKey key;
-	
+
 	private ArrayList<T> data;
-	
+
 	private C transformed;
-	
+
 	private boolean materialized;
-	
+
 	private boolean disposed;
-	
-	
+
 	public BroadcastVariableMaterialization(BroadcastVariableKey key) {
 		this.key = key;
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	public void materializeVariable(MutableReader<?> reader, TypeSerializerFactory<?>
serializerFactory, BatchTask<?, ?> referenceHolder)
-			throws MaterializationExpiredException, IOException
-	{
+			throws MaterializationExpiredException, IOException {
 		Preconditions.checkNotNull(reader);
 		Preconditions.checkNotNull(serializerFactory);
 		Preconditions.checkNotNull(referenceHolder);
-		
+
 		final boolean materializer;
-		
+
 		// hold the reference lock only while we track references and decide who should be the
materializer
 		// that way, other tasks can de-register (in case of failure) while materialization is
happening
 		synchronized (references) {
 			if (disposed) {
 				throw new MaterializationExpiredException();
 			}
-			
+
 			// sanity check
 			if (!references.add(referenceHolder)) {
 				throw new IllegalStateException(
@@ -87,64 +88,65 @@ public class BroadcastVariableMaterialization<T, C> {
 								referenceHolder.getEnvironment().getTaskInfo().getTaskNameWithSubtasks(),
 								key.toString()));
 			}
-			
+
 			materializer = references.size() == 1;
 		}
 
 		try {
 			@SuppressWarnings("unchecked")
 			final MutableReader<DeserializationDelegate<T>> typedReader = (MutableReader<DeserializationDelegate<T>>)
reader;
-			
+
 			@SuppressWarnings("unchecked")
 			final TypeSerializer<T> serializer = ((TypeSerializerFactory<T>) serializerFactory).getSerializer();
 
 			final ReaderIterator<T> readerIterator = new ReaderIterator<T>(typedReader,
serializer);
-			
+
 			if (materializer) {
 				// first one, so we need to materialize;
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("Getting Broadcast Variable (" + key + ") - First access, materializing.");
 				}
-				
+
 				ArrayList<T> data = new ArrayList<T>();
-				
+
 				T element;
 				while ((element = readerIterator.next()) != null) {
 					data.add(element);
 				}
-				
+
 				synchronized (materializationMonitor) {
 					this.data = data;
 					this.materialized = true;
 					materializationMonitor.notifyAll();
 				}
-				
+
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("Materialization of Broadcast Variable (" + key + ") finished.");
 				}
 			}
 			else {
 				// successor: discard all data and refer to the shared variable
-				
+
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("Getting Broadcast Variable (" + key + ") - shared access.");
 				}
-				
+
 				T element = serializer.createInstance();
-				while ((element = readerIterator.next(element)) != null);
-				
+				while ((element = readerIterator.next(element)) != null) {
+				}
+
 				synchronized (materializationMonitor) {
 					while (!this.materialized && !disposed) {
 						materializationMonitor.wait();
 					}
 				}
-				
+
 			}
 		}
 		catch (Throwable t) {
 			// in case of an exception, we need to clean up big time
 			decrementReferenceIfHeld(referenceHolder);
-			
+
 			if (t instanceof IOException) {
 				throw (IOException) t;
 			} else {
@@ -152,15 +154,15 @@ public class BroadcastVariableMaterialization<T, C> {
 			}
 		}
 	}
-	
+
 	public boolean decrementReference(BatchTask<?, ?> referenceHolder) {
 		return decrementReferenceInternal(referenceHolder, true);
 	}
-	
+
 	public boolean decrementReferenceIfHeld(BatchTask<?, ?> referenceHolder) {
 		return decrementReferenceInternal(referenceHolder, false);
 	}
-	
+
 	private boolean decrementReferenceInternal(BatchTask<?, ?> referenceHolder, boolean
errorIfNoReference) {
 		synchronized (references) {
 			if (disposed || references.isEmpty()) {
@@ -170,7 +172,7 @@ public class BroadcastVariableMaterialization<T, C> {
 					return false;
 				}
 			}
-			
+
 			if (!references.remove(referenceHolder)) {
 				if (errorIfNoReference) {
 					throw new IllegalStateException(
@@ -181,8 +183,7 @@ public class BroadcastVariableMaterialization<T, C> {
 					return false;
 				}
 			}
-			
-			
+
 			if (references.isEmpty()) {
 				disposed = true;
 				data = null;
@@ -193,9 +194,9 @@ public class BroadcastVariableMaterialization<T, C> {
 			}
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	public List<T> getVariable() throws InitializationTypeConflictException {
 		if (!materialized) {
 			throw new IllegalStateException("The Broadcast Variable has not yet been materialized.");
@@ -203,7 +204,7 @@ public class BroadcastVariableMaterialization<T, C> {
 		if (disposed) {
 			throw new IllegalStateException("The Broadcast Variable has been disposed");
 		}
-		
+
 		synchronized (references) {
 			if (transformed != null) {
 				if (transformed instanceof List) {
@@ -219,7 +220,7 @@ public class BroadcastVariableMaterialization<T, C> {
 			}
 		}
 	}
-	
+
 	public C getVariable(BroadcastVariableInitializer<T, C> initializer) {
 		if (!materialized) {
 			throw new IllegalStateException("The Broadcast Variable has not yet been materialized.");
@@ -227,7 +228,7 @@ public class BroadcastVariableMaterialization<T, C> {
 		if (disposed) {
 			throw new IllegalStateException("The Broadcast Variable has been disposed");
 		}
-		
+
 		synchronized (references) {
 			if (transformed == null) {
 				transformed = initializer.initializeBroadcastVariable(data);

http://git-wip-us.apache.org/repos/asf/flink/blob/4d18afed/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/DefaultBroadcastVariableInitializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/DefaultBroadcastVariableInitializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/DefaultBroadcastVariableInitializer.java
index f18c431..dfa8bcd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/DefaultBroadcastVariableInitializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/DefaultBroadcastVariableInitializer.java
@@ -18,31 +18,34 @@
 
 package org.apache.flink.runtime.broadcast;
 
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
-
+/**
+ * The default {@link BroadcastVariableInitializer} implementation that initializes the broadcast
variable into a list.
+ */
 public class DefaultBroadcastVariableInitializer<T> implements BroadcastVariableInitializer<T,
List<T>> {
 
 	@Override
 	public List<T> initializeBroadcastVariable(Iterable<T> data) {
 		ArrayList<T> list = new ArrayList<T>();
-		
+
 		for (T value : data) {
 			list.add(value);
 		}
 		return list;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	private static final DefaultBroadcastVariableInitializer<Object> INSTANCE = new DefaultBroadcastVariableInitializer<Object>();
-	
+
 	@SuppressWarnings("unchecked")
 	public static <E> DefaultBroadcastVariableInitializer<E> instance() {
 		return (DefaultBroadcastVariableInitializer<E>) INSTANCE;
 	}
-	
+
 	private DefaultBroadcastVariableInitializer() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d18afed/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/InitializationTypeConflictException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/InitializationTypeConflictException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/InitializationTypeConflictException.java
index f478210..b78f395 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/InitializationTypeConflictException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/InitializationTypeConflictException.java
@@ -18,16 +18,23 @@
 
 package org.apache.flink.runtime.broadcast;
 
+import org.apache.flink.api.common.functions.RuntimeContext;
+
+/**
+ * Indicates that a broadcast variable was initialized with a {@link DefaultBroadcastVariableInitializer}
as a
+ * non-{@link java.util.List} type, and later accessed using {@link RuntimeContext#getBroadcastVariable(String)}
which
+ * may only return lists.
+ */
 public class InitializationTypeConflictException extends Exception {
-	
+
 	private static final long serialVersionUID = -3930913982433642882L;
-	
+
 	private final Class<?> type;
 
 	public InitializationTypeConflictException(Class<?> type) {
 		this.type = type;
 	}
-	
+
 	public Class<?> getType() {
 		return type;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d18afed/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/MaterializationExpiredException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/MaterializationExpiredException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/MaterializationExpiredException.java
index 45f4f47..31b60f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/MaterializationExpiredException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/MaterializationExpiredException.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.runtime.broadcast;
 
+/**
+ * Indicates that the {@link BroadcastVariableMaterialization} has materialized the broadcast
variable at some point
+ * but discarded it already.
+ */
 public class MaterializationExpiredException extends Exception {
 	private static final long serialVersionUID = 7476456353634121934L;
 }


Mime
View raw message