flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zentol <...@git.apache.org>
Subject [GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
Date Wed, 15 Nov 2017 16:45:04 GMT
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5021#discussion_r151183730
  
    --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
---
    @@ -439,6 +443,85 @@ public Integer getKey(Tuple2<Integer, Long> value) throws Exception
{
     	}
     
     	/**
    +	 * Tests that the correct exception is thrown if the query
    +	 * contains a wrong queryable state name.
    +	 */
    +	@Test
    +	public void testWrongQueryableStateName() throws Exception {
    +		// Config
    +		final Deadline deadline = TEST_TIMEOUT.fromNow();
    +
    +		final long numElements = 1024L;
    +
    +		JobID jobId = null;
    +		try {
    +			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +			env.setStateBackend(stateBackend);
    +			env.setParallelism(maxParallelism);
    +			// Very important, because cluster is shared between tests and we
    +			// don't explicitly check that all slots are available before
    +			// submitting.
    +			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
    +
    +			DataStream<Tuple2<Integer, Long>> source = env
    +					.addSource(new TestAscendingValueSource(numElements));
    +
    +			// Value state
    +			ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
    +					new ValueStateDescriptor<>("any", source.getType());
    +
    +			source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
    +				private static final long serialVersionUID = 7662520075515707428L;
    +
    +				@Override
    +				public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
    +					return value.f0;
    +				}
    +			}).asQueryableState("hakuna", valueState);
    +
    +			// Submit the job graph
    +			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
    +			jobId = jobGraph.getJobID();
    +
    +			cluster.submitJobDetached(jobGraph);
    +
    +			// wait until the job is running before starting to query.
    +			FutureUtils.toJava(cluster.getLeaderGateway(deadline.timeLeft())
    +					.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobId, JobStatus.RUNNING),
deadline.timeLeft())
    +					.mapTo(ClassTag$.MODULE$.<TestingJobManagerMessages.JobStatusIs>apply(TestingJobManagerMessages.JobStatusIs.class)));
    +
    +			CompletableFuture<ValueState<Tuple2<Integer, Long>>> future = client.getKvState(
    +					jobId,
    +					"wrong-hankuna", // this is the wrong name.
    +					0,
    +					VoidNamespace.INSTANCE,
    +					BasicTypeInfo.INT_TYPE_INFO,
    +					VoidNamespaceTypeInfo.INSTANCE,
    +					valueState);
    +
    +			final CompletableFuture<?> completion = new CompletableFuture<>();
    +			future.whenComplete((result, throwable) -> {
    --- End diff --
    
    I don't think we need the `completion` future here. we could return null in the future
in case of success, throw an exception in case of failure, and call get on the result.


---

Mime
View raw message