flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] flink git commit: [FLINK-5421] Deduplicate code in StateHandle Iterators
Date Thu, 12 Jan 2017 12:23:33 GMT
Repository: flink
Updated Branches:
  refs/heads/master 3f7f17285 -> 499aea0d8


[FLINK-5421] Deduplicate code in StateHandle Iterators


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/499aea0d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/499aea0d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/499aea0d

Branch: refs/heads/master
Commit: 499aea0d834ad8e8ef34fb80ffd89b6482062e37
Parents: aaf8e09
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Wed Jan 11 15:36:04 2017 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Thu Jan 12 11:26:38 2017 +0100

----------------------------------------------------------------------
 .../state/StateInitializationContextImpl.java   | 93 +++++++++++---------
 1 file changed, 50 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/499aea0d/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
index be59a2a..46445d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
@@ -136,35 +136,31 @@ public class StateInitializationContextImpl implements StateInitializationContex
 		IOUtils.closeQuietly(closableRegistry);
 	}
 
-	private static class KeyGroupStreamIterator implements Iterator<KeyGroupStatePartitionStreamProvider>
{
+	private static class KeyGroupStreamIterator
+			extends AbstractStateStreamIterator<KeyGroupStatePartitionStreamProvider, KeyGroupsStateHandle>
{
 
-		private final Iterator<KeyGroupsStateHandle> stateHandleIterator;
-		private final CloseableRegistry closableRegistry;
-
-		private KeyGroupsStateHandle currentStateHandle;
-		private FSDataInputStream currentStream;
 		private Iterator<Tuple2<Integer, Long>> currentOffsetsIterator;
 
 		public KeyGroupStreamIterator(
 				Iterator<KeyGroupsStateHandle> stateHandleIterator, CloseableRegistry closableRegistry)
{
 
-			this.stateHandleIterator = Preconditions.checkNotNull(stateHandleIterator);
-			this.closableRegistry = Preconditions.checkNotNull(closableRegistry);
+			super(stateHandleIterator, closableRegistry);
 		}
 
 		@Override
 		public boolean hasNext() {
+
 			if (null != currentStateHandle && currentOffsetsIterator.hasNext()) {
+
 				return true;
 			}
 
+			closeCurrentStream();
+
 			while (stateHandleIterator.hasNext()) {
 				currentStateHandle = stateHandleIterator.next();
 				if (currentStateHandle.getNumberOfKeyGroups() > 0) {
 					currentOffsetsIterator = currentStateHandle.getGroupRangeOffsets().iterator();
-					closableRegistry.unregisterClosable(currentStream);
-					IOUtils.closeQuietly(currentStream);
-					currentStream = null;
 
 					return true;
 				}
@@ -173,46 +169,33 @@ public class StateInitializationContextImpl implements StateInitializationContex
 			return false;
 		}
 
-		private void openStream() throws IOException {
-			FSDataInputStream stream = currentStateHandle.openInputStream();
-			closableRegistry.registerClosable(stream);
-			currentStream = stream;
-		}
-
 		@Override
 		public KeyGroupStatePartitionStreamProvider next() {
 
 			if (!hasNext()) {
+
 				throw new NoSuchElementException("Iterator exhausted");
 			}
 
 			Tuple2<Integer, Long> keyGroupOffset = currentOffsetsIterator.next();
 			try {
 				if (null == currentStream) {
-					openStream();
+					openCurrentStream();
 				}
 				currentStream.seek(keyGroupOffset.f1);
+
 				return new KeyGroupStatePartitionStreamProvider(currentStream, keyGroupOffset.f0);
 			} catch (IOException ioex) {
+
 				return new KeyGroupStatePartitionStreamProvider(ioex, keyGroupOffset.f0);
 			}
 		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException("Read only Iterator");
-		}
 	}
 
-	private static class OperatorStateStreamIterator implements Iterator<StatePartitionStreamProvider>
{
+	private static class OperatorStateStreamIterator
+			extends AbstractStateStreamIterator<StatePartitionStreamProvider, OperatorStateHandle>
{
 
 		private final String stateName; //TODO since we only support a single named state in raw,
this could be dropped
-
-		private final Iterator<OperatorStateHandle> stateHandleIterator;
-		private final CloseableRegistry closableRegistry;
-
-		private OperatorStateHandle currentStateHandle;
-		private FSDataInputStream currentStream;
 		private long[] offsets;
 		private int offPos;
 
@@ -221,18 +204,20 @@ public class StateInitializationContextImpl implements StateInitializationContex
 				Iterator<OperatorStateHandle> stateHandleIterator,
 				CloseableRegistry closableRegistry) {
 
+			super(stateHandleIterator, closableRegistry);
 			this.stateName = Preconditions.checkNotNull(stateName);
-			this.stateHandleIterator = Preconditions.checkNotNull(stateHandleIterator);
-			this.closableRegistry = Preconditions.checkNotNull(closableRegistry);
 		}
 
 		@Override
 		public boolean hasNext() {
 
 			if (null != offsets && offPos < offsets.length) {
+
 				return true;
 			}
 
+			closeCurrentStream();
+
 			while (stateHandleIterator.hasNext()) {
 				currentStateHandle = stateHandleIterator.next();
 				long[] offsets = currentStateHandle.getStateNameToPartitionOffsets().get(stateName);
@@ -241,10 +226,6 @@ public class StateInitializationContextImpl implements StateInitializationContex
 					this.offsets = offsets;
 					this.offPos = 0;
 
-					closableRegistry.unregisterClosable(currentStream);
-					IOUtils.closeQuietly(currentStream);
-					currentStream = null;
-
 					return true;
 				}
 			}
@@ -252,16 +233,11 @@ public class StateInitializationContextImpl implements StateInitializationContex
 			return false;
 		}
 
-		private void openStream() throws IOException {
-			FSDataInputStream stream = currentStateHandle.openInputStream();
-			closableRegistry.registerClosable(stream);
-			currentStream = stream;
-		}
-
 		@Override
 		public StatePartitionStreamProvider next() {
 
 			if (!hasNext()) {
+
 				throw new NoSuchElementException("Iterator exhausted");
 			}
 
@@ -269,15 +245,46 @@ public class StateInitializationContextImpl implements StateInitializationContex
 
 			try {
 				if (null == currentStream) {
-					openStream();
+					openCurrentStream();
 				}
 				currentStream.seek(offset);
 
 				return new StatePartitionStreamProvider(currentStream);
 			} catch (IOException ioex) {
+
 				return new StatePartitionStreamProvider(ioex);
 			}
 		}
+	}
+
+	abstract static class AbstractStateStreamIterator<T extends StatePartitionStreamProvider,
H extends StreamStateHandle>
+			implements Iterator<T> {
+
+		protected final Iterator<H> stateHandleIterator;
+		protected final CloseableRegistry closableRegistry;
+
+		protected H currentStateHandle;
+		protected FSDataInputStream currentStream;
+
+		public AbstractStateStreamIterator(
+				Iterator<H> stateHandleIterator,
+				CloseableRegistry closableRegistry) {
+
+			this.stateHandleIterator = Preconditions.checkNotNull(stateHandleIterator);
+			this.closableRegistry = Preconditions.checkNotNull(closableRegistry);
+		}
+
+		protected void openCurrentStream() throws IOException {
+			FSDataInputStream stream = currentStateHandle.openInputStream();
+			closableRegistry.registerClosable(stream);
+			currentStream = stream;
+		}
+
+		protected void closeCurrentStream() {
+			closableRegistry.unregisterClosable(currentStream);
+			IOUtils.closeQuietly(currentStream);
+			currentStream = null;
+		}
 
 		@Override
 		public void remove() {


Mime
View raw message