flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [FLINK-5507] [queryable state] Remove list variant of asQueryableState
Date Fri, 20 Jan 2017 11:32:25 GMT
Repository: flink
Updated Branches:
  refs/heads/master 7ff7f431d -> 63a6af3be


[FLINK-5507] [queryable state] Remove list variant of asQueryableState

The queryable state "sink" using ListState stores all incoming data
forever and is never cleaned. Eventually, it will pile up too much
memory and is thus of limited use.

This closes #3129.
This closes #3120 (left over).


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63a6af3b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63a6af3b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63a6af3b

Branch: refs/heads/master
Commit: 63a6af3bec17f2e45563d41032d1e60b8ee97d81
Parents: 7ff7f43
Author: Nico Kruber <nico@data-artisans.com>
Authored: Mon Jan 16 13:36:02 2017 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Fri Jan 20 12:32:18 2017 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/KeyedStream.java   |  25 -----
 .../flink/streaming/api/scala/KeyedStream.scala |  28 +----
 .../flink/test/query/QueryableStateITCase.java  | 104 -------------------
 3 files changed, 2 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/63a6af3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 73d8926..3e3afd3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -682,30 +681,6 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	/**
-	 * Publishes the keyed stream as a queryable ListStance instance.
-	 *
-	 * @param queryableStateName Name under which to the publish the queryable state instance
-	 * @param stateDescriptor State descriptor to create state instance from
-	 * @return Queryable state instance
-	 */
-	@PublicEvolving
-	public QueryableStateStream<KEY, T> asQueryableState(
-			String queryableStateName,
-			ListStateDescriptor<T> stateDescriptor) {
-
-		transform("Queryable state: " + queryableStateName,
-				getType(),
-				new QueryableAppendingStateOperator<>(queryableStateName, stateDescriptor));
-
-		stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
-
-		return new QueryableStateStream<>(
-				queryableStateName,
-				getType().createSerializer(getExecutionConfig()),
-				getKeyType().createSerializer(getExecutionConfig()));
-	}
-
-	/**
 	 * Publishes the keyed stream as a queryable FoldingState instance.
 	 *
 	 * @param queryableStateName Name under which to the publish the queryable state instance

http://git-wip-us.apache.org/repos/asf/flink/blob/63a6af3b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index b251ca6..99936e7 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -20,10 +20,10 @@ package org.apache.flink.streaming.api.scala
 
 import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
 import org.apache.flink.api.common.functions._
-import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescriptor, ReducingStateDescriptor,
ValueStateDescriptor}
+import org.apache.flink.api.common.state.{FoldingStateDescriptor, ReducingStateDescriptor,
ValueStateDescriptor}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.streaming.api.datastream.{QueryableStateStream, SingleOutputStreamOperator,
DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}
+import org.apache.flink.streaming.api.datastream.{QueryableStateStream, DataStream =>
JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}
 import org.apache.flink.streaming.api.functions.{ProcessFunction, RichProcessFunction}
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
@@ -504,30 +504,6 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
   }
 
   /**
-    * Publishes the keyed stream as a queryable ListState instance.
-    *
-    * @param queryableStateName Name under which to the publish the queryable state instance
-    * @param stateDescriptor State descriptor to create state instance from
-    * @return Queryable state instance
-    */
-  @PublicEvolving
-  def asQueryableState(
-     queryableStateName: String,
-      stateDescriptor: ListStateDescriptor[T]) : QueryableStateStream[K, T]  = {
-
-    transform(
-      s"Queryable state: $queryableStateName",
-      new QueryableAppendingStateOperator(queryableStateName, stateDescriptor))(dataType)
-
-    stateDescriptor.initializeSerializerUnlessSet(executionConfig)
-
-    new QueryableStateStream(
-      queryableStateName,
-      stateDescriptor.getSerializer,
-      getKeyType.createSerializer(executionConfig))
-  }
-
-  /**
     * Publishes the keyed stream as a queryable FoldingState instance.
     *
     * @param queryableStateName Name under which to the publish the queryable state instance

http://git-wip-us.apache.org/repos/asf/flink/blob/63a6af3b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index 88e4f9a..113f5c6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -31,7 +31,6 @@ import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
@@ -821,109 +820,6 @@ public class QueryableStateITCase extends TestLogger {
 	}
 
 	/**
-	 * Tests simple list state queryable state instance. Each source emits
-	 * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
-	 * queried. The tests succeeds after each subtask index is queried with
-	 * a list of size numElements and each emitted tuple is part of the list.
-	 */
-	@Test
-	public void testListState() throws Exception {
-		// Config
-		final Deadline deadline = TEST_TIMEOUT.fromNow();
-
-		final int numElements = 128;
-
-		final QueryableStateClient client = new QueryableStateClient(cluster.configuration());
-
-		JobID jobId = null;
-		try {
-			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-			env.setParallelism(NUM_SLOTS);
-			// Very important, because cluster is shared between tests and we
-			// don't explicitly check that all slots are available before
-			// submitting.
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
-
-			DataStream<Tuple2<Integer, Long>> source = env
-					.addSource(new TestAscendingValueSource(numElements));
-
-			// List state
-			ListStateDescriptor<Tuple2<Integer, Long>> listState = new ListStateDescriptor<>(
-					"any",
-					source.getType());
-
-			QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState =
-					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
-						@Override
-						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
-							return value.f0;
-						}
-					}).asQueryableState("timon", listState);
-
-			// Submit the job graph
-			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
-			jobId = jobGraph.getJobID();
-
-			cluster.submitJobDetached(jobGraph);
-
-			// Now query
-			long expected = numElements + 1; // +1 for 0-value
-
-			FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS);
-			for (int key = 0; key < NUM_SLOTS; key++) {
-				final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(
-						key,
-						queryableState.getKeySerializer(),
-						VoidNamespace.INSTANCE,
-						VoidNamespaceSerializer.INSTANCE);
-
-				boolean success = false;
-				while (deadline.hasTimeLeft() && !success) {
-					Future<byte[]> future = getKvStateWithRetries(client,
-							jobId,
-							queryableState.getQueryableStateName(),
-							key,
-							serializedKey,
-							retryDelay);
-
-					byte[] serializedValue = Await.result(future, deadline.timeLeft());
-
-					List<Tuple2<Integer, Long>> list = KvStateRequestSerializer.deserializeList(
-							serializedValue,
-							queryableState.getValueSerializer());
-
-					if (list.size() == expected) {
-						for (int i = 0; i < expected; i++) {
-							Tuple2<Integer, Long> elem = list.get(i);
-							assertEquals("Key mismatch", key, elem.f0.intValue());
-							assertEquals("Value mismatch", i, elem.f1.longValue());
-						}
-
-						success = true;
-					} else {
-						// Retry
-						Thread.sleep(50);
-					}
-				}
-
-				assertTrue("Did not succeed query", success);
-			}
-		} finally {
-			// Free cluster resources
-			if (jobId != null) {
-				Future<CancellationSuccess> cancellation = cluster
-						.getLeaderGateway(deadline.timeLeft())
-						.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft())
-						.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(CancellationSuccess.class));
-
-				Await.ready(cancellation, deadline.timeLeft());
-			}
-
-			client.shutDown();
-		}
-	}
-
-	/**
 	 * Tests simple folding state queryable state instance. Each source emits
 	 * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
 	 * queried. The folding state sums these up and maps them to Strings. The


Mime
View raw message