flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject [1/2] flink git commit: [hotfix] [cep] SharedBuffer refactoring.
Date Mon, 05 Feb 2018 16:20:59 GMT
Repository: flink
Updated Branches:
  refs/heads/master 9ad1fa7cb -> 3d323ba10


[hotfix] [cep] SharedBuffer refactoring.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47638d02
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47638d02
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47638d02

Branch: refs/heads/master
Commit: 47638d02dffd808e266834b433bcc2d096781c75
Parents: 9ad1fa7
Author: kkloudas <kkloudas@gmail.com>
Authored: Thu Jan 25 10:59:30 2018 +0100
Committer: kkloudas <kkloudas@gmail.com>
Committed: Mon Feb 5 17:19:16 2018 +0100

----------------------------------------------------------------------
 .../org/apache/flink/cep/nfa/SharedBuffer.java  | 321 +++++++++----------
 1 file changed, 146 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/47638d02/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index 802d98a..f34eb5f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -39,7 +39,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -76,7 +75,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable
{
 	private transient Map<K, SharedBufferPage<K, V>> pages;
 
 	public SharedBuffer() {
-		this.pages = new HashMap<>();
+		this.pages = new HashMap<>(4);
 	}
 
 	/**
@@ -176,28 +175,26 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 	 * @return {@code true} if pruning happened
 	 */
 	public boolean prune(long pruningTimestamp) {
-		Iterator<Map.Entry<K, SharedBufferPage<K, V>>> iter = pages.entrySet().iterator();
-		List<SharedBufferEntry<K, V>> prunedEntries = new ArrayList<>();
+		final List<SharedBufferEntry<K, V>> prunedEntries = new ArrayList<>();
 
-		while (iter.hasNext()) {
-			SharedBufferPage<K, V> page = iter.next().getValue();
+		final Iterator<Map.Entry<K, SharedBufferPage<K, V>>> it = pages.entrySet().iterator();
+		while (it.hasNext()) {
+			SharedBufferPage<K, V> page = it.next().getValue();
 
 			page.prune(pruningTimestamp, prunedEntries);
-
 			if (page.isEmpty()) {
-				// delete page if it is empty
-				iter.remove();
+				it.remove();
 			}
 		}
 
-		if (!prunedEntries.isEmpty()) {
-			for (Map.Entry<K, SharedBufferPage<K, V>> entry : pages.entrySet()) {
-				entry.getValue().removeEdges(prunedEntries);
-			}
-			return true;
-		} else {
+		if (prunedEntries.isEmpty()) {
 			return false;
 		}
+
+		for (SharedBufferPage<K, V> entry : pages.values()) {
+			entry.removeEdges(prunedEntries);
+		}
+		return true;
 	}
 
 	/**
@@ -226,7 +223,7 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 		SharedBufferEntry<K, V> entry = get(key, value, timestamp, counter);
 
 		if (entry != null) {
-			extractionStates.add(new ExtractionState<>(entry, version, new Stack<SharedBufferEntry<K,
V>>()));
+			extractionStates.add(new ExtractionState<>(entry, version, new Stack<>()));
 
 			// use a depth first search to reconstruct the previous relations
 			while (!extractionStates.isEmpty()) {
@@ -250,7 +247,6 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 						}
 						values.add(currentPathEntry.getValueTime().getValue());
 					}
-
 					result.add(completePath);
 				} else {
 
@@ -283,7 +279,6 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 
 			}
 		}
-
 		return result;
 	}
 
@@ -327,7 +322,7 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 			final long timestamp,
 			final int counter) {
 		SharedBufferPage<K, V> page = pages.get(key);
-		return page == null ? null : page.get(new ValueTimeWrapper<V>(value, timestamp, counter));
+		return page == null ? null : page.get(new ValueTimeWrapper<>(value, timestamp, counter));
 	}
 
 	private void internalRemove(final SharedBufferEntry<K, V> entry) {
@@ -355,8 +350,8 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 		StringBuilder builder = new StringBuilder();
 
 		for (Map.Entry<K, SharedBufferPage<K, V>> entry : pages.entrySet()) {
-			builder.append("Key: ").append(entry.getKey()).append("\n");
-			builder.append("Value: ").append(entry.getValue()).append("\n");
+			builder.append("Key: ").append(entry.getKey()).append(System.lineSeparator());
+			builder.append("Value: ").append(entry.getValue()).append(System.lineSeparator());
 		}
 
 		return builder.toString();
@@ -387,13 +382,10 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 	 */
 	private static class SharedBufferPage<K, V> {
 
-		// key of the page
 		private final K key;
+		private final Map<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> entries;
 
-		// Map of entries which are stored in this page
-		private final HashMap<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> entries;
-
-		public SharedBufferPage(final K key) {
+		SharedBufferPage(final K key) {
 			this.key = key;
 			entries = new HashMap<>();
 		}
@@ -412,22 +404,18 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 		 */
 		public void add(final ValueTimeWrapper<V> valueTime, final SharedBufferEntry<K,
V> previous, final DeweyNumber version) {
 			SharedBufferEntry<K, V> sharedBufferEntry = entries.get(valueTime);
-
 			if (sharedBufferEntry == null) {
-				sharedBufferEntry = new SharedBufferEntry<K, V>(valueTime, this);
-
+				sharedBufferEntry = new SharedBufferEntry<>(valueTime, this);
 				entries.put(valueTime, sharedBufferEntry);
 			}
 
 			SharedBufferEdge<K, V> newEdge;
-
 			if (previous != null) {
 				newEdge = new SharedBufferEdge<>(previous, version);
 				previous.increaseReferenceCounter();
 			} else {
 				newEdge = new SharedBufferEdge<>(null, version);
 			}
-
 			sharedBufferEntry.addEdge(newEdge);
 		}
 
@@ -437,72 +425,63 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 
 		/**
 		 * Removes all entries from the map whose timestamp is smaller than the pruning timestamp.
-		 *
 		 * @param pruningTimestamp Timestamp for the pruning
+		 * @param prunedEntries a {@link Set} to put the removed {@link SharedBufferEntry SharedBufferEntries}.
 		 */
-		public void prune(long pruningTimestamp, List<SharedBufferEntry<K, V>> prunedEntries)
{
-			Iterator<Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>>>
iterator = entries.entrySet().iterator();
-			boolean continuePruning = true;
-
-			while (iterator.hasNext() && continuePruning) {
-				SharedBufferEntry<K, V> entry = iterator.next().getValue();
-
+		private void prune(final long pruningTimestamp, final List<SharedBufferEntry<K, V>>
prunedEntries) {
+			Iterator<Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>>>
it = entries.entrySet().iterator();
+			while (it.hasNext()) {
+				SharedBufferEntry<K, V> entry = it.next().getValue();
 				if (entry.getValueTime().getTimestamp() <= pruningTimestamp) {
 					prunedEntries.add(entry);
-					iterator.remove();
-				} else {
-					continuePruning = false;
+					it.remove();
 				}
 			}
 		}
 
-		public boolean isEmpty() {
-			return entries.isEmpty();
+		/**
+		 * Remove edges with the specified targets for the entries.
+		 */
+		private void removeEdges(final List<SharedBufferEntry<K, V>> prunedEntries)
{
+			for (SharedBufferEntry<K, V> entry : entries.values()) {
+				entry.removeEdges(prunedEntries);
+			}
 		}
 
 		public SharedBufferEntry<K, V> remove(final ValueTimeWrapper<V> valueTime)
{
 			return entries.remove(valueTime);
 		}
 
-		/**
-		 * Remove edges with the specified targets for the entries.
-		 */
-		private void removeEdges(final List<SharedBufferEntry<K, V>> prunedEntries)
{
-			for (Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> entry :
entries.entrySet()) {
-				entry.getValue().removeEdges(prunedEntries);
-			}
+		public boolean isEmpty() {
+			return entries.isEmpty();
 		}
 
 		@Override
 		public String toString() {
 			StringBuilder builder = new StringBuilder();
-
-			builder.append("SharedBufferPage(\n");
-
+			builder.append("SharedBufferPage(" + System.lineSeparator());
 			for (SharedBufferEntry<K, V> entry: entries.values()) {
-				builder.append(entry.toString()).append("\n");
+				builder.append(entry).append(System.lineSeparator());
 			}
-
 			builder.append(")");
-
 			return builder.toString();
 		}
 
 		@Override
 		public boolean equals(Object obj) {
-			if (obj instanceof SharedBufferPage) {
-				@SuppressWarnings("unchecked")
-				SharedBufferPage<K, V> other = (SharedBufferPage<K, V>) obj;
-
-				return key.equals(other.key) && entries.equals(other.entries);
-			} else {
+			if (!(obj instanceof SharedBufferPage)) {
 				return false;
 			}
+			SharedBufferPage<K, V> other = (SharedBufferPage<K, V>) obj;
+			return key.equals(other.getKey()) && entries.equals(other.entries);
 		}
 
 		@Override
 		public int hashCode() {
-			return Objects.hash(key, entries);
+			int result = 1;
+			result += 31 * result + key.hashCode();
+			result += 31 * result + entries.hashCode();
+			return result;
 		}
 	}
 
@@ -519,8 +498,9 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 		private final ValueTimeWrapper<V> valueTime;
 		private final Set<SharedBufferEdge<K, V>> edges;
 		private final SharedBufferPage<K, V> page;
+
 		private int referenceCounter;
-		private transient int entryId;
+		private int entryId;
 
 		SharedBufferEntry(
 				final ValueTimeWrapper<V> valueTime,
@@ -532,17 +512,14 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 				final ValueTimeWrapper<V> valueTime,
 				final SharedBufferEdge<K, V> edge,
 				final SharedBufferPage<K, V> page) {
+
 			this.valueTime = valueTime;
 			edges = new HashSet<>();
-
 			if (edge != null) {
 				edges.add(edge);
 			}
-
 			referenceCounter = 0;
-
 			entryId = -1;
-
 			this.page = page;
 		}
 
@@ -550,7 +527,7 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 			return valueTime;
 		}
 
-		public Collection<SharedBufferEdge<K, V>> getEdges() {
+		public Set<SharedBufferEdge<K, V>> getEdges() {
 			return edges;
 		}
 
@@ -578,14 +555,8 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 			}
 		}
 
-		public boolean remove() {
-			if (page != null) {
-				page.remove(valueTime);
-
-				return true;
-			} else {
-				return false;
-			}
+		public void remove() {
+			page.remove(valueTime);
 		}
 
 		public void increaseReferenceCounter() {
@@ -609,22 +580,27 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 
 		@Override
 		public boolean equals(Object obj) {
-			if (obj instanceof SharedBufferEntry) {
-				@SuppressWarnings("unchecked")
-				SharedBufferEntry<K, V> other = (SharedBufferEntry<K, V>) obj;
+			if (!(obj instanceof SharedBufferEntry)) {
+				return false;
+			}
 
-				return valueTime.equals(other.valueTime) &&
+			@SuppressWarnings("unchecked")
+			SharedBufferEntry<K, V> other = (SharedBufferEntry<K, V>) obj;
+
+			return valueTime.equals(other.valueTime) &&
 					getKey().equals(other.getKey()) &&
 					referenceCounter == other.referenceCounter &&
-					edges.equals(other.edges);
-			} else {
-				return false;
-			}
+					Objects.equals(edges, other.edges);
 		}
 
 		@Override
 		public int hashCode() {
-			return Objects.hash(valueTime, getKey(), referenceCounter, edges);
+			int result = 1;
+			result += 31 * result + valueTime.hashCode();
+			result += 31 * result + getKey().hashCode();
+			result += 31 * result + referenceCounter;
+			result += 31 * result + edges.hashCode();
+			return result;
 		}
 	}
 
@@ -634,11 +610,11 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 	 * @param <K> Type of the key
 	 * @param <V> Type of the value
 	 */
-	public static class SharedBufferEdge<K, V> {
+	private static class SharedBufferEdge<K, V> {
 		private final SharedBufferEntry<K, V> target;
 		private final DeweyNumber version;
 
-		public SharedBufferEdge(final SharedBufferEntry<K, V> target, final DeweyNumber version)
{
+		SharedBufferEdge(final SharedBufferEntry<K, V> target, final DeweyNumber version)
{
 			this.target = target;
 			this.version = version;
 		}
@@ -658,22 +634,21 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 
 		@Override
 		public boolean equals(Object obj) {
-			if (obj instanceof SharedBufferEdge) {
-				@SuppressWarnings("unchecked")
-				SharedBufferEdge<K, V> other = (SharedBufferEdge<K, V>) obj;
-
-				if (version.equals(other.version)) {
-					if (target == null && other.target == null) {
-						return true;
-					} else if (target != null && other.target != null) {
-						return target.getKey().equals(other.target.getKey()) &&
-							target.getValueTime().equals(other.target.getValueTime());
-					} else {
-						return false;
-					}
-				} else {
-					return false;
-				}
+			if (!(obj instanceof SharedBufferEdge)) {
+				return false;
+			}
+
+			@SuppressWarnings("unchecked")
+			SharedBufferEdge<K, V> other = (SharedBufferEdge<K, V>) obj;
+			if (!version.equals(other.getVersion())) {
+				return false;
+			}
+
+			if (target == null && other.getTarget() == null) {
+				return true;
+			} else if (target != null && other.getTarget() != null) {
+				return target.getKey().equals(other.getTarget().getKey()) &&
+						target.getValueTime().equals(other.getTarget().getValueTime());
 			} else {
 				return false;
 			}
@@ -694,7 +669,7 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 	 *
 	 * @param <V> Type of the value
 	 */
-	static class ValueTimeWrapper<V> {
+	private static class ValueTimeWrapper<V> {
 
 		private final V value;
 		private final long timestamp;
@@ -730,20 +705,41 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 
 		@Override
 		public boolean equals(Object obj) {
-			if (obj instanceof ValueTimeWrapper) {
-				@SuppressWarnings("unchecked")
-				ValueTimeWrapper<V> other = (ValueTimeWrapper<V>) obj;
-
-				return timestamp == other.getTimestamp() && value.equals(other.getValue()) &&
counter == other.getCounter();
-			} else {
+			if (!(obj instanceof ValueTimeWrapper)) {
 				return false;
 			}
+
+			@SuppressWarnings("unchecked")
+			ValueTimeWrapper<V> other = (ValueTimeWrapper<V>) obj;
+
+			return timestamp == other.getTimestamp()
+					&& Objects.equals(value, other.getValue())
+					&& counter == other.getCounter();
 		}
 
 		@Override
 		public int hashCode() {
 			return (int) (31 * (31 * (timestamp ^ timestamp >>> 32) + value.hashCode()) +
counter);
 		}
+
+		public void serialize(
+				final TypeSerializer<V> valueSerializer,
+				final DataOutputView target) throws IOException {
+			valueSerializer.serialize(value, target);
+			target.writeLong(timestamp);
+			target.writeInt(counter);
+		}
+
+		public static <V> ValueTimeWrapper<V> deserialize(
+				final TypeSerializer<V> valueSerializer,
+				final DataInputView source) throws IOException {
+
+			final V value = valueSerializer.deserialize(source);
+			final long timestamp = source.readLong();
+			final int counter = source.readInt();
+
+			return new ValueTimeWrapper<>(value, timestamp, counter);
+		}
 	}
 
 	/**
@@ -799,9 +795,9 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 		public SharedBufferSerializerConfigSnapshot() {}
 
 		public SharedBufferSerializerConfigSnapshot(
-				TypeSerializer<K> keySerializer,
-				TypeSerializer<V> valueSerializer,
-				TypeSerializer<DeweyNumber> versionSerializer) {
+				final TypeSerializer<K> keySerializer,
+				final TypeSerializer<V> valueSerializer,
+				final TypeSerializer<DeweyNumber> versionSerializer) {
 
 			super(keySerializer, valueSerializer, versionSerializer);
 		}
@@ -824,15 +820,15 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 		private final TypeSerializer<DeweyNumber> versionSerializer;
 
 		public SharedBufferSerializer(
-				TypeSerializer<K> keySerializer,
-				TypeSerializer<V> valueSerializer) {
+				final TypeSerializer<K> keySerializer,
+				final TypeSerializer<V> valueSerializer) {
 			this(keySerializer, valueSerializer, new DeweyNumber.DeweyNumberSerializer());
 		}
 
 		public SharedBufferSerializer(
-				TypeSerializer<K> keySerializer,
-				TypeSerializer<V> valueSerializer,
-				TypeSerializer<DeweyNumber> versionSerializer) {
+				final TypeSerializer<K> keySerializer,
+				final TypeSerializer<V> valueSerializer,
+				final TypeSerializer<DeweyNumber> versionSerializer) {
 
 			this.keySerializer = keySerializer;
 			this.valueSerializer = valueSerializer;
@@ -905,63 +901,50 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 			// number of pages
 			target.writeInt(pages.size());
 
-			for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry: pages.entrySet()) {
-				SharedBufferPage<K, V> page = pageEntry.getValue();
+			for (SharedBufferPage<K, V> page: pages.values()) {
 
 				// key for the current page
 				keySerializer.serialize(page.getKey(), target);
 
-				// number of page entries
 				target.writeInt(page.entries.size());
-
-				for (Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> sharedBufferEntry:
page.entries.entrySet()) {
-					SharedBufferEntry<K, V> sharedBuffer = sharedBufferEntry.getValue();
+				for (SharedBufferEntry<K, V> sharedBuffer: page.entries.values()) {
 
 					// assign id to the sharedBufferEntry for the future
 					// serialization of the previous relation
 					sharedBuffer.entryId = entryCounter++;
 
 					ValueTimeWrapper<V> valueTimeWrapper = sharedBuffer.getValueTime();
+					valueTimeWrapper.serialize(valueSerializer, target);
+					target.writeInt(sharedBuffer.getReferenceCounter());
 
-					valueSerializer.serialize(valueTimeWrapper.getValue(), target);
-					target.writeLong(valueTimeWrapper.getTimestamp());
-					target.writeInt(valueTimeWrapper.getCounter());
-
-					int edges = sharedBuffer.edges.size();
-					totalEdges += edges;
-
-					target.writeInt(sharedBuffer.referenceCounter);
+					totalEdges += sharedBuffer.getEdges().size();
 				}
 			}
 
 			// write the edges between the shared buffer entries
 			target.writeInt(totalEdges);
 
-			for (Map.Entry<K, SharedBufferPage<K, V>> pageEntry: pages.entrySet()) {
-				SharedBufferPage<K, V> page = pageEntry.getValue();
+			for (SharedBufferPage<K, V> page: pages.values()) {
+				for (SharedBufferEntry<K, V> sharedBuffer: page.entries.values()) {
 
-				for (Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> sharedBufferEntry:
page.entries.entrySet()) {
-					SharedBufferEntry<K, V> sharedBuffer = sharedBufferEntry.getValue();
+					// in order to serialize the previous relation we simply serialize
+					// the ids of the source and target SharedBufferEntry
 
-					int id = sharedBuffer.entryId;
-					Preconditions.checkState(id != -1, "Could not find id for entry: " + sharedBuffer);
+					int sourceId = sharedBuffer.entryId;
+					Preconditions.checkState(sourceId != -1,
+							"Could not find id for entry: " + sharedBuffer);
 
 					for (SharedBufferEdge<K, V> edge: sharedBuffer.edges) {
-						// in order to serialize the previous relation we simply serialize the ids
-						// of the source and target SharedBufferEntry
-						if (edge.target != null) {
-							int targetId = edge.getTarget().entryId;
+						int targetId = -1;
+						if (edge.getTarget() != null) {
+							targetId = edge.getTarget().entryId;
 							Preconditions.checkState(targetId != -1,
 									"Could not find id for entry: " + edge.getTarget());
-
-							target.writeInt(id);
-							target.writeInt(targetId);
-							versionSerializer.serialize(edge.version, target);
-						} else {
-							target.writeInt(id);
-							target.writeInt(-1);
-							versionSerializer.serialize(edge.version, target);
 						}
+
+						target.writeInt(sourceId);
+						target.writeInt(targetId);
+						versionSerializer.serialize(edge.getVersion(), target);
 					}
 				}
 			}
@@ -975,24 +958,16 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 			int totalPages = source.readInt();
 
 			for (int i = 0; i < totalPages; i++) {
+
 				// key of the page
 				K key = keySerializer.deserialize(source);
-
 				SharedBufferPage<K, V> page = new SharedBufferPage<>(key);
-
 				pages.put(key, page);
 
 				int numberEntries = source.readInt();
-
 				for (int j = 0; j < numberEntries; j++) {
-					// restore the SharedBufferEntries for the given page
-					V value = valueSerializer.deserialize(source);
-					long timestamp = source.readLong();
-					int counter = source.readInt();
-
-					ValueTimeWrapper<V> valueTimeWrapper = new ValueTimeWrapper<>(value, timestamp,
counter);
-					SharedBufferEntry<K, V> sharedBufferEntry = new SharedBufferEntry<K, V>(valueTimeWrapper,
page);
-
+					ValueTimeWrapper<V> valueTimeWrapper = ValueTimeWrapper.deserialize(valueSerializer,
source);
+					SharedBufferEntry<K, V> sharedBufferEntry = new SharedBufferEntry<>(valueTimeWrapper,
page);
 					sharedBufferEntry.referenceCounter = source.readInt();
 
 					page.entries.put(valueTimeWrapper, sharedBufferEntry);
@@ -1005,26 +980,22 @@ public class SharedBuffer<K extends Serializable, V> implements
Serializable {
 			int totalEdges = source.readInt();
 
 			for (int j = 0; j < totalEdges; j++) {
-				int sourceIndex = source.readInt();
-				Preconditions.checkState(sourceIndex < entryList.size() && sourceIndex >=
0,
-						"Could not find source entry with index " + sourceIndex + 	". This indicates a corrupted
state.");
+				int sourceIdx = source.readInt();
+				Preconditions.checkState(sourceIdx < entryList.size() && sourceIdx >= 0,
+						"Could not find source entry with index " + sourceIdx + 	". This indicates a corrupted
state.");
 
-				int targetIndex = source.readInt();
-				Preconditions.checkState(targetIndex < entryList.size(),
-						"Could not find target entry with index " + sourceIndex + 	". This indicates a corrupted
state.");
+				int targetIdx = source.readInt();
+				Preconditions.checkState(targetIdx < entryList.size(),
+						"Could not find target entry with index " + sourceIdx + 	". This indicates a corrupted
state.");
 
 				DeweyNumber version = versionSerializer.deserialize(source);
 
 				// We've already deserialized the shared buffer entry. Simply read its ID and
 				// retrieve the buffer entry from the list of entries
-				SharedBufferEntry<K, V> sourceEntry = entryList.get(sourceIndex);
-				SharedBufferEntry<K, V> targetEntry = targetIndex < 0 ? null : entryList.get(targetIndex);
-
+				SharedBufferEntry<K, V> sourceEntry = entryList.get(sourceIdx);
+				SharedBufferEntry<K, V> targetEntry = targetIdx < 0 ? null : entryList.get(targetIdx);
 				sourceEntry.edges.add(new SharedBufferEdge<>(targetEntry, version));
 			}
-			// here we put the old NonDuplicating serializer because this needs to create a copy
-			// of the buffer, as created by the NFA. There, for compatibility reasons, we have left
-			// the old serializer.
 			return new SharedBuffer<>(pages);
 		}
 


Mime
View raw message