flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zentol <...@git.apache.org>
Subject [GitHub] flink pull request #5062: [FLINK-7880][QS] Wait for proper resource cleanup ...
Date Wed, 06 Dec 2017 12:43:01 GMT
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5062#discussion_r155225199
  
    --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
---
    @@ -260,89 +260,90 @@ public void testDuplicateRegistrationFailsJob() throws Exception
{
     		final Deadline deadline = TEST_TIMEOUT.fromNow();
     		final int numKeys = 256;
     
    -		JobID jobId = null;
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +		env.setStateBackend(stateBackend);
    +		env.setParallelism(maxParallelism);
    +		// Very important, because cluster is shared between tests and we
    +		// don't explicitly check that all slots are available before
    +		// submitting.
    +		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
     
    -		try {
    -			//
    -			// Test program
    -			//
    -			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    -			env.setStateBackend(stateBackend);
    -			env.setParallelism(maxParallelism);
    -			// Very important, because cluster is shared between tests and we
    -			// don't explicitly check that all slots are available before
    -			// submitting.
    -			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000L));
    -
    -			DataStream<Tuple2<Integer, Long>> source = env
    -					.addSource(new TestKeyRangeSource(numKeys));
    -
    -			// Reducing state
    -			ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
    -					"any-name",
    -					new SumReduce(),
    -					source.getType());
    -
    -			final String queryName = "duplicate-me";
    -
    -			final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState
=
    -					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
    -						private static final long serialVersionUID = -4126824763829132959L;
    -
    -						@Override
    -						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
    -							return value.f0;
    -						}
    -					}).asQueryableState(queryName, reducingState);
    +		DataStream<Tuple2<Integer, Long>> source = env.addSource(new TestKeyRangeSource(numKeys));
     
    -			final QueryableStateStream<Integer, Tuple2<Integer, Long>> duplicate =
    -					source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
    -						private static final long serialVersionUID = -6265024000462809436L;
    +		// Reducing state
    +		ReducingStateDescriptor<Tuple2<Integer, Long>> reducingState = new ReducingStateDescriptor<>(
    +				"any-name",
    +				new SumReduce(),
    +				source.getType());
     
    -						@Override
    -						public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
    -							return value.f0;
    -						}
    -					}).asQueryableState(queryName);
    +		final String queryName = "duplicate-me";
     
    -			// Submit the job graph
    -			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
    -			jobId = jobGraph.getJobID();
    +		final QueryableStateStream<Integer, Tuple2<Integer, Long>> queryableState
=
    +				source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
    +					private static final long serialVersionUID = -4126824763829132959L;
     
    -			final CompletableFuture<TestingJobManagerMessages.JobStatusIs> failedFuture
=
    -					notifyWhenJobStatusIs(jobId, JobStatus.FAILED, deadline);
    +					@Override
    +					public Integer getKey(Tuple2<Integer, Long> value) {
    +						return value.f0;
    +					}
    +				}).asQueryableState(queryName, reducingState);
     
    -			cluster.submitJobDetached(jobGraph);
    +		final QueryableStateStream<Integer, Tuple2<Integer, Long>> duplicate =
    +				source.keyBy(new KeySelector<Tuple2<Integer, Long>, Integer>() {
    +					private static final long serialVersionUID = -6265024000462809436L;
     
    -			TestingJobManagerMessages.JobStatusIs jobStatus =
    -					failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
    -			assertEquals(JobStatus.FAILED, jobStatus.state());
    +					@Override
    +					public Integer getKey(Tuple2<Integer, Long> value) {
    +						return value.f0;
    +					}
    +				}).asQueryableState(queryName);
    +
    +		// Submit the job graph
    +		final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
    +		final JobID jobId = jobGraph.getJobID();
    +
    +		final CompletableFuture<TestingJobManagerMessages.JobStatusIs> failedFuture =
    +				notifyWhenJobStatusIs(jobId, JobStatus.FAILED, deadline);
     
    -			// Get the job and check the cause
    -			JobManagerMessages.JobFound jobFound = FutureUtils.toJava(
    -					cluster.getLeaderGateway(deadline.timeLeft())
    -							.ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft())
    -							.mapTo(ClassTag$.MODULE$.<JobManagerMessages.JobFound>apply(JobManagerMessages.JobFound.class)))
    -					.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
    +		final CompletableFuture<TestingJobManagerMessages.JobStatusIs> cancellationFuture
=
    +				notifyWhenJobStatusIs(jobId, JobStatus.CANCELED, deadline);
     
    -			String failureCause = jobFound.executionGraph().getFailureCause().getExceptionAsString();
    +		cluster.submitJobDetached(jobGraph);
     
    -			assertTrue("Not instance of SuppressRestartsException", failureCause.startsWith("org.apache.flink.runtime.execution.SuppressRestartsException"));
    -			int causedByIndex = failureCause.indexOf("Caused by: ");
    -			String subFailureCause = failureCause.substring(causedByIndex + "Caused by: ".length());
    -			assertTrue("Not caused by IllegalStateException", subFailureCause.startsWith("java.lang.IllegalStateException"));
    -			assertTrue("Exception does not contain registration name", subFailureCause.contains(queryName));
    +		try {
    +			final TestingJobManagerMessages.JobStatusIs jobStatus =
    +					failedFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
    +			assertEquals(JobStatus.FAILED, jobStatus.state());
    --- End diff --
    
    isn't this always true if the future did not time out? (In which case get() throws a TimeoutException)


---

Mime
View raw message