flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject [1/2] flink git commit: [FLINK-7826][QS] Add support for all types of state to the QS Client.
Date Wed, 18 Oct 2017 08:03:41 GMT
Repository: flink
Updated Branches:
  refs/heads/master 717a7dc81 -> abc3e1c88


http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java
b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java
new file mode 100644
index 0000000..5f7032d
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.queryablestate.client.state.ImmutableValueState;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests the {@link ImmutableValueState}.
+ */
+public class ImmutableValueStateTest {
+
+	private final ValueStateDescriptor<Long> valueStateDesc =
+			new ValueStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO);
+
+	private ImmutableValueState<Long> valueState;
+
+	@Before
+	public void setUp() throws Exception {
+		if (!valueStateDesc.isSerializerInitialized()) {
+			valueStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
+		}
+
+		valueState = ImmutableValueState.createState(
+				valueStateDesc,
+				ByteBuffer.allocate(Long.BYTES).putLong(42L).array()
+		);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testUpdate() {
+		long value = valueState.value();
+		assertEquals(42L, value);
+
+		valueState.update(54L);
+	}
+
+	@Test(expected = UnsupportedOperationException.class)
+	public void testClear() {
+		long value = valueState.value();
+		assertEquals(42L, value);
+
+		valueState.clear();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 51af430..ebcd7d5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -745,7 +745,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 
 		return new QueryableStateStream<>(
 				queryableStateName,
-				stateDescriptor.getSerializer(),
+				stateDescriptor,
 				getKeyType().createSerializer(getExecutionConfig()));
 	}
 
@@ -772,7 +772,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 
 		return new QueryableStateStream<>(
 				queryableStateName,
-				stateDescriptor.getSerializer(),
+				stateDescriptor,
 				getKeyType().createSerializer(getExecutionConfig()));
 	}
 
@@ -796,7 +796,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 
 		return new QueryableStateStream<>(
 				queryableStateName,
-				stateDescriptor.getSerializer(),
+				stateDescriptor,
 				getKeyType().createSerializer(getExecutionConfig()));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/QueryableStateStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/QueryableStateStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/QueryableStateStream.java
index d0de2ab..7f20fd6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/QueryableStateStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/QueryableStateStream.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.util.Preconditions;
 
@@ -37,23 +38,23 @@ public class QueryableStateStream<K, V> {
 	/** Key serializer for the state instance. */
 	private final TypeSerializer<K> keySerializer;
 
-	/** Value serializer for the state instance. */
-	private final TypeSerializer<V> valueSerializer;
+	/** State descriptor for the state instance. */
+	private final StateDescriptor<?, V> stateDescriptor;
 
 	/**
 	 * Creates a queryable state stream.
 	 *
 	 * @param queryableStateName Name under which to publish the queryable state instance
-	 * @param valueSerializer Value serializer for the state instance
+	 * @param stateDescriptor The state descriptor for the state instance
 	 * @param keySerializer Key serializer for the state instance
 	 */
 	public QueryableStateStream(
 			String queryableStateName,
-			TypeSerializer<V> valueSerializer,
+			StateDescriptor<?, V> stateDescriptor,
 			TypeSerializer<K> keySerializer) {
 
 		this.queryableStateName = Preconditions.checkNotNull(queryableStateName, "Queryable state
name");
-		this.valueSerializer = Preconditions.checkNotNull(valueSerializer, "Value serializer");
+		this.stateDescriptor = Preconditions.checkNotNull(stateDescriptor, "State Descriptor");
 		this.keySerializer = Preconditions.checkNotNull(keySerializer, "Key serializer");
 	}
 
@@ -67,15 +68,6 @@ public class QueryableStateStream<K, V> {
 	}
 
 	/**
-	 * Returns the value serializer for the queryable state instance.
-	 *
-	 * @return Value serializer for the state instance
-	 */
-	public TypeSerializer<V> getValueSerializer() {
-		return valueSerializer;
-	}
-
-	/**
 	 * Returns the key serializer for the queryable state instance.
 	 *
 	 * @return Key serializer for the state instance.
@@ -84,4 +76,12 @@ public class QueryableStateStream<K, V> {
 		return keySerializer;
 	}
 
+	/**
+	 * Returns the state descriptor for the queryable state instance.
+	 *
+	 * @return State descriptor for the state instance
+	 */
+	public StateDescriptor<?, V> getStateDescriptor() {
+		return stateDescriptor;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/abc3e1c8/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index aaeb1ec..49bdbd9 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -497,7 +497,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
 
     new QueryableStateStream(
       queryableStateName,
-      stateDescriptor.getSerializer,
+      stateDescriptor,
       getKeyType.createSerializer(executionConfig))
   }
 
@@ -522,7 +522,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
 
     new QueryableStateStream(
       queryableStateName,
-      stateDescriptor.getSerializer,
+      stateDescriptor,
       getKeyType.createSerializer(executionConfig))
   }
 
@@ -546,7 +546,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
 
     new QueryableStateStream(
       queryableStateName,
-      stateDescriptor.getSerializer,
+      stateDescriptor,
       getKeyType.createSerializer(executionConfig))
   }
   


Mime
View raw message