flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [1/2] flink git commit: [FLINK-5602] Introduce artifical namespace serializer for migration
Date Wed, 25 Jan 2017 16:56:16 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.2 103bb361c -> 80c9f8018


[FLINK-5602] Introduce artifical namespace serializer for migration

This closes #3200.
This closes #3198.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/80c9f801
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/80c9f801
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/80c9f801

Branch: refs/heads/release-1.2
Commit: 80c9f801831693ec23014f322e492a2584c2e6d5
Parents: 5a435fc
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Tue Jan 24 14:35:42 2017 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Wed Jan 25 17:56:06 2017 +0100

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |   6 +-
 .../MigrationNamespaceSerializerProxy.java      | 116 +++++++++++++++++++
 .../state/RegisteredBackendStateMetaInfo.java   |  28 ++---
 3 files changed, 134 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/80c9f801/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index fdb978b..ce010b5 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -37,6 +37,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.migration.MigrationNamespaceSerializerProxy;
 import org.apache.flink.migration.MigrationUtil;
 import org.apache.flink.migration.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
@@ -1140,7 +1141,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 			columnFamilyMapping.put(mappingByte, stateDescriptor);
 
 			// this will fill in the k/v state information
-			getColumnFamily(stateDescriptor, null);
+			getColumnFamily(stateDescriptor, MigrationNamespaceSerializerProxy.INSTANCE);
 		}
 
 		// try and read until EOF
@@ -1148,7 +1149,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 			// the EOFException will get us out of this...
 			while (true) {
 				byte mappingByte = inputView.readByte();
-				ColumnFamilyHandle handle = getColumnFamily(columnFamilyMapping.get(mappingByte),null);
+				ColumnFamilyHandle handle = getColumnFamily(
+						columnFamilyMapping.get(mappingByte), MigrationNamespaceSerializerProxy.INSTANCE);
 
 				byte[] keyAndNamespace = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/80c9f801/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
new file mode 100644
index 0000000..808a9ae
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/migration/MigrationNamespaceSerializerProxy.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.migration;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The purpose of this class is the be filled in as a placeholder for the namespace serializer
when migrating from
+ * Flink 1.1 savepoint (which did not include the namespace serializer) to Flink 1.2 (which
always must include a
+ * (non-null) namespace serializer. This is then replaced as soon as the user is re-registering
her state again for
+ * the first run under Flink 1.2 and provides again the real namespace serializer.
+ */
+@Deprecated
+public class MigrationNamespaceSerializerProxy extends TypeSerializer<Serializable>
{
+
+	public static final MigrationNamespaceSerializerProxy INSTANCE = new MigrationNamespaceSerializerProxy();
+
+	private static final long serialVersionUID = -707800010807094491L;
+
+	private MigrationNamespaceSerializerProxy() {
+	}
+
+	@Override
+	public boolean isImmutableType() {
+		return false;
+	}
+
+	@Override
+	public TypeSerializer<Serializable> duplicate() {
+		throw new UnsupportedOperationException(
+				"This is just a proxy used during migration until the real type serializer is provided
by the user.");
+	}
+
+	@Override
+	public Serializable createInstance() {
+		throw new UnsupportedOperationException(
+				"This is just a proxy used during migration until the real type serializer is provided
by the user.");
+	}
+
+	@Override
+	public Serializable copy(Serializable from) {
+		throw new UnsupportedOperationException(
+				"This is just a proxy used during migration until the real type serializer is provided
by the user.");
+	}
+
+	@Override
+	public Serializable copy(Serializable from, Serializable reuse) {
+		throw new UnsupportedOperationException(
+				"This is just a proxy used during migration until the real type serializer is provided
by the user.");
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
+	}
+
+	@Override
+	public void serialize(Serializable record, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException(
+				"This is just a proxy used during migration until the real type serializer is provided
by the user.");
+	}
+
+	@Override
+	public Serializable deserialize(DataInputView source) throws IOException {
+		throw new UnsupportedOperationException(
+				"This is just a proxy used during migration until the real type serializer is provided
by the user.");
+	}
+
+	@Override
+	public Serializable deserialize(Serializable reuse, DataInputView source) throws IOException
{
+		throw new UnsupportedOperationException(
+				"This is just a proxy used during migration until the real type serializer is provided
by the user.");
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		throw new UnsupportedOperationException(
+				"This is just a proxy used during migration until the real type serializer is provided
by the user.");
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		return obj instanceof MigrationNamespaceSerializerProxy;
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return true;
+	}
+
+	@Override
+	public int hashCode() {
+		return 42;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/80c9f801/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
index 0c50486..80bdacd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBackendStateMetaInfo.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.migration.MigrationNamespaceSerializerProxy;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -38,10 +39,11 @@ public class RegisteredBackendStateMetaInfo<N, S> {
 	private final TypeSerializer<S> stateSerializer;
 
 	public RegisteredBackendStateMetaInfo(KeyedBackendSerializationProxy.StateMetaInfo<N,
S> metaInfoProxy) {
-		this.stateType = metaInfoProxy.getStateType();
-		this.name = metaInfoProxy.getStateName();
-		this.namespaceSerializer = metaInfoProxy.getNamespaceSerializerSerializationProxy().getTypeSerializer();
-		this.stateSerializer = metaInfoProxy.getStateSerializerSerializationProxy().getTypeSerializer();
+		this(
+				metaInfoProxy.getStateType(),
+				metaInfoProxy.getStateName(),
+				metaInfoProxy.getNamespaceSerializerSerializationProxy().getTypeSerializer(),
+				metaInfoProxy.getStateSerializerSerializationProxy().getTypeSerializer());
 	}
 
 	public RegisteredBackendStateMetaInfo(
@@ -52,8 +54,8 @@ public class RegisteredBackendStateMetaInfo<N, S> {
 
 		this.stateType = checkNotNull(stateType);
 		this.name = checkNotNull(name);
-		this.namespaceSerializer = namespaceSerializer;
-		this.stateSerializer = stateSerializer;
+		this.namespaceSerializer = checkNotNull(namespaceSerializer);
+		this.stateSerializer = checkNotNull(stateSerializer);
 	}
 
 	public StateDescriptor.Type getStateType() {
@@ -92,10 +94,10 @@ public class RegisteredBackendStateMetaInfo<N, S> {
 			return false;
 		}
 
-		return ((namespaceSerializer == null && other.namespaceSerializer == null)
-					|| namespaceSerializer == null || other.namespaceSerializer == null
-					|| namespaceSerializer.isCompatibleWith(other.namespaceSerializer))
-				&& stateSerializer.isCompatibleWith(other.stateSerializer);
+		return (stateSerializer.isCompatibleWith(other.stateSerializer)) &&
+				(namespaceSerializer.isCompatibleWith(other.namespaceSerializer)
+						// we also check if there is just a migration proxy that should be replaced by any
real serializer
+						|| other.namespaceSerializer instanceof MigrationNamespaceSerializerProxy);
 	}
 
 	@Override
@@ -118,10 +120,8 @@ public class RegisteredBackendStateMetaInfo<N, S> {
 			return false;
 		}
 
-		if (getNamespaceSerializer() != null ? !getNamespaceSerializer().equals(that.getNamespaceSerializer())
: that.getNamespaceSerializer() != null) {
-			return false;
-		}
-		return getStateSerializer() != null ? getStateSerializer().equals(that.getStateSerializer())
: that.getStateSerializer() == null;
+		return getStateSerializer().equals(that.getStateSerializer())
+				&& getNamespaceSerializer().equals(that.getNamespaceSerializer());
 	}
 
 	@Override


Mime
View raw message