flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zentol <...@git.apache.org>
Subject [GitHub] flink pull request #5719: [FLINK-8959][tests] Port AccumulatorLiveITCase to ...
Date Mon, 19 Mar 2018 20:01:36 GMT
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5719#discussion_r175566292
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
---
    @@ -18,292 +18,188 @@
     
     package org.apache.flink.test.accumulators;
     
    -import org.apache.flink.api.common.JobExecutionResult;
    -import org.apache.flink.api.common.JobID;
     import org.apache.flink.api.common.Plan;
    -import org.apache.flink.api.common.accumulators.Accumulator;
     import org.apache.flink.api.common.accumulators.IntCounter;
     import org.apache.flink.api.common.functions.RichFlatMapFunction;
     import org.apache.flink.api.common.io.OutputFormat;
     import org.apache.flink.api.java.DataSet;
     import org.apache.flink.api.java.ExecutionEnvironment;
    -import org.apache.flink.api.java.LocalEnvironment;
    +import org.apache.flink.client.program.ClusterClient;
     import org.apache.flink.configuration.AkkaOptions;
    -import org.apache.flink.configuration.ConfigConstants;
     import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.HeartbeatManagerOptions;
    +import org.apache.flink.core.testutils.MultiShotLatch;
     import org.apache.flink.optimizer.DataStatistics;
     import org.apache.flink.optimizer.Optimizer;
     import org.apache.flink.optimizer.plan.OptimizedPlan;
     import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
    -import org.apache.flink.runtime.akka.AkkaUtils;
    -import org.apache.flink.runtime.akka.ListeningBehaviour;
    -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    -import org.apache.flink.runtime.instance.ActorGateway;
    -import org.apache.flink.runtime.instance.AkkaActorGateway;
     import org.apache.flink.runtime.jobgraph.JobGraph;
    -import org.apache.flink.runtime.messages.JobManagerMessages;
    -import org.apache.flink.runtime.testingUtils.TestingCluster;
    -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
    -import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
     import org.apache.flink.runtime.testingUtils.TestingUtils;
     import org.apache.flink.streaming.api.datastream.DataStream;
     import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.test.util.MiniClusterResource;
    +import org.apache.flink.testutils.category.Flip6;
     import org.apache.flink.util.Collector;
     import org.apache.flink.util.TestLogger;
     
    -import akka.actor.ActorRef;
    -import akka.actor.ActorSystem;
    -import akka.pattern.Patterns;
    -import akka.testkit.JavaTestKit;
    -import akka.util.Timeout;
    -import org.junit.After;
     import org.junit.Before;
    +import org.junit.ClassRule;
     import org.junit.Test;
    +import org.junit.experimental.categories.Category;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.io.IOException;
     import java.util.ArrayList;
     import java.util.List;
     import java.util.Map;
    -import java.util.concurrent.TimeUnit;
     
    -import scala.concurrent.Await;
    -import scala.concurrent.Future;
    -import scala.concurrent.duration.FiniteDuration;
    -
    -import static org.junit.Assert.fail;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
     
     /**
    - * Tests the availability of accumulator results during runtime. The test case tests
a user-defined
    - * accumulator and Flink's internal accumulators for two consecutive tasks.
    - *
    - * <p>CHAINED[Source -> Map] -> Sink
    - *
    - * <p>Checks are performed as the elements arrive at the operators. Checks consist
of a message sent by
    - * the task to the task manager which notifies the job manager and sends the current
accumulators.
    - * The task blocks until the test has been notified about the current accumulator values.
    - *
    - * <p>A barrier between the operators ensures that that pipelining is disabled
for the streaming test.
    - * The batch job reads the records one at a time. The streaming code buffers the records
beforehand;
    - * that's why exact guarantees about the number of records read are very hard to make.
Thus, why we
    - * check for an upper bound of the elements read.
    + * Tests the availability of accumulator results during runtime.
      */
    +@Category(Flip6.class)
     public class AccumulatorLiveITCase extends TestLogger {
     
     	private static final Logger LOG = LoggerFactory.getLogger(AccumulatorLiveITCase.class);
     
    -	private static ActorSystem system;
    -	private static ActorGateway jobManagerGateway;
    -	private static ActorRef taskManager;
    -
    -	private static JobID jobID;
    -	private static JobGraph jobGraph;
    -
     	// name of user accumulator
     	private static final String ACCUMULATOR_NAME = "test";
     
    +	private static final long HEARTBEAT_INTERVAL = 50L;
    +
     	// number of heartbeat intervals to check
     	private static final int NUM_ITERATIONS = 5;
     
    -	private static List<String> inputData = new ArrayList<>(NUM_ITERATIONS);
    +	private static final List<Integer> inputData = new ArrayList<>(NUM_ITERATIONS);
     
    -	private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
    +	static {
    +		// generate test data
    +		for (int i = 0; i < NUM_ITERATIONS; i++) {
    +			inputData.add(i);
    +		}
    +	}
     
    -	@Before
    -	public void before() throws Exception {
    -		system = AkkaUtils.createLocalActorSystem(new Configuration());
    +	@ClassRule
    +	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
    +		new MiniClusterResource.MiniClusterResourceConfiguration(
    +			getConfiguration(),
    +			1,
    +			1),
    +		true);
     
    +	private static Configuration getConfiguration() {
     		Configuration config = new Configuration();
    -		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
    -		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
     		config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
    -		TestingCluster testingCluster = new TestingCluster(config, false, true);
    -		testingCluster.start();
    -
    -		jobManagerGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
    -		taskManager = testingCluster.getTaskManagersAsJava().get(0);
    -
    -		// generate test data
    -		for (int i = 0; i < NUM_ITERATIONS; i++) {
    -			inputData.add(i, String.valueOf(i + 1));
    -		}
    +		config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL);
     
    -		NotifyingMapper.finished = false;
    +		return config;
     	}
     
    -	@After
    -	public void after() throws Exception {
    -		JavaTestKit.shutdownActorSystem(system);
    -		inputData.clear();
    +	@Before
    +	public void resetLatches() throws InterruptedException {
    +		NotifyingMapper.reset();
     	}
     
     	@Test
     	public void testBatch() throws Exception {
    -
    -		/** The program **/
    -		ExecutionEnvironment env = new BatchPlanExtractor();
    +		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
     		env.setParallelism(1);
     
    -		DataSet<String> input = env.fromCollection(inputData);
    +		DataSet<Integer> input = env.fromCollection(inputData);
     		input
     				.flatMap(new NotifyingMapper())
    -				.output(new NotifyingOutputFormat());
    -
    -		env.execute();
    +				.output(new DummyOutputFormat());
     
     		// Extract job graph and set job id for the task to notify of accumulator changes.
    -		jobGraph = getOptimizedPlan(((BatchPlanExtractor) env).plan);
    -		jobID = jobGraph.getJobID();
    +		JobGraph jobGraph = getJobGraph(env.createProgramPlan());
     
    -		verifyResults();
    +		submitJobAndVerifyResults(jobGraph);
     	}
     
     	@Test
     	public void testStreaming() throws Exception {
     
    -		StreamExecutionEnvironment env = new DummyStreamExecutionEnvironment();
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     		env.setParallelism(1);
     
    -		DataStream<String> input = env.fromCollection(inputData);
    +		DataStream<Integer> input = env.fromCollection(inputData);
     		input
     				.flatMap(new NotifyingMapper())
    -				.writeUsingOutputFormat(new NotifyingOutputFormat()).disableChaining();
    +				.writeUsingOutputFormat(new DummyOutputFormat()).disableChaining();
     
    -		jobGraph = env.getStreamGraph().getJobGraph();
    -		jobID = jobGraph.getJobID();
    +		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
     
    -		verifyResults();
    +		submitJobAndVerifyResults(jobGraph);
     	}
     
    -	private static void verifyResults() {
    -		new JavaTestKit(system) {{
    -
    -			ActorGateway selfGateway = new AkkaActorGateway(getRef(), jobManagerGateway.leaderSessionID());
    -
    -			// register for accumulator changes
    -			jobManagerGateway.tell(new TestingJobManagerMessages.NotifyWhenAccumulatorChange(jobID),
selfGateway);
    -			expectMsgEquals(TIMEOUT, true);
    -
    -			// submit job
    -
    -			jobManagerGateway.tell(
    -					new JobManagerMessages.SubmitJob(
    -							jobGraph,
    -							ListeningBehaviour.EXECUTION_RESULT),
    -					selfGateway);
    -			expectMsgClass(TIMEOUT, JobManagerMessages.JobSubmitSuccess.class);
    -
    -			TestingJobManagerMessages.UpdatedAccumulators msg = (TestingJobManagerMessages.UpdatedAccumulators)
receiveOne(TIMEOUT);
    -			Map<String, Accumulator<?, ?>> userAccumulators = msg.userAccumulators();
    +	private static void submitJobAndVerifyResults(JobGraph jobGraph) throws Exception {
     
    -			ExecutionAttemptID mapperTaskID = null;
    -
    -			ExecutionAttemptID sinkTaskID = null;
    -
    -			/* Check for accumulator values */
    -			if (checkUserAccumulators(0, userAccumulators)) {
    -				LOG.info("Passed initial check for map task.");
    -			} else {
    -				fail("Wrong accumulator results when map task begins execution.");
    -			}
    -
    -			int expectedAccVal = 0;
    -
    -			/* for mapper task */
    -			for (int i = 1; i <= NUM_ITERATIONS; i++) {
    -				expectedAccVal += i;
    -
    -				// receive message
    -				msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
    -				userAccumulators = msg.userAccumulators();
    -
    -				LOG.info("{}", userAccumulators);
    -
    -				if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
    -					LOG.info("Passed round #" + i);
    -				} else if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
    -					// we determined the wrong task id and need to switch the two here
    -					ExecutionAttemptID temp = mapperTaskID;
    -					mapperTaskID = sinkTaskID;
    -					sinkTaskID = temp;
    -					LOG.info("Passed round #" + i);
    -				} else {
    -					fail("Failed in round #" + i);
    -				}
    -			}
    -
    -			msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
    -			userAccumulators = msg.userAccumulators();
    -
    -			if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
    -				LOG.info("Passed initial check for sink task.");
    -			} else {
    -				fail("Wrong accumulator results when sink task begins execution.");
    -			}
    +		ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
     
    -			/* for sink task */
    -			for (int i = 1; i <= NUM_ITERATIONS; i++) {
    +		client.setDetached(true);
    +		client.submitJob(jobGraph, AccumulatorLiveITCase.class.getClassLoader());
     
    -				// receive message
    -				msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
    +		try {
    +			NotifyingMapper.notifyLatch.await();
    +			Thread.sleep(HEARTBEAT_INTERVAL * 4); // wait for heartbeat
    --- End diff --
    
    Well....yeah we could use a deadline and polling, probably will be even faster.


---

Mime
View raw message