flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [23/35] flink git commit: [FLINK-8586][tests] Clean up hard to maintain tests
Date Mon, 19 Feb 2018 14:08:16 GMT
[FLINK-8586][tests] Clean up hard to maintain tests

SpilledSubpartitionViewTest duplicates a lot of production logic (TestSubpartitionConsumer
is a
duplicated logic of LocalInputChannel and mix of CreditBasedSequenceNumberingViewReader with
PartitionRequestQueue.
Also it seems like most of the logic is covered by SpillableSubpartitionTest.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/89605adb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/89605adb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/89605adb

Branch: refs/heads/master
Commit: 89605adb68b5cbbed6b0370355ec5ab343059910
Parents: 635c29d
Author: Piotr Nowojski <piotr.nowojski@gmail.com>
Authored: Tue Jan 30 09:17:50 2018 +0100
Committer: Piotr Nowojski <piotr.nowojski@gmail.com>
Committed: Mon Feb 19 12:21:34 2018 +0100

----------------------------------------------------------------------
 .../partition/SpilledSubpartitionViewTest.java  | 224 -------------------
 .../network/util/TestSubpartitionConsumer.java  |   1 +
 2 files changed, 1 insertion(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/89605adb/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
deleted file mode 100644
index 08444f9..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.util.TestBufferFactory;
-import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
-import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-
-import org.junit.AfterClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for {@link SpillableSubpartitionView}, in addition to indirect tests via {@link
- * SpillableSubpartitionTest}.
- */
-public class SpilledSubpartitionViewTest {
-
-	private static final IOManager IO_MANAGER = new IOManagerAsync();
-
-	@AfterClass
-	public static void shutdown() {
-		IO_MANAGER.shutdown();
-	}
-
-	@Test
-	public void testWriteConsume() throws Exception {
-		// Config
-		final int numberOfBuffersToWrite = 512;
-
-		// Setup
-		final BufferFileWriter writer = createWriterAndWriteBuffers(numberOfBuffersToWrite);
-
-		writer.close();
-
-		TestPooledBufferProvider viewBufferPool = new TestPooledBufferProvider(1);
-
-		TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(
-			false, new TestConsumerCallback.RecyclingCallback());
-
-		SpilledSubpartitionView view = new SpilledSubpartitionView(
-			mock(SpillableSubpartition.class),
-			viewBufferPool.getMemorySegmentSize(),
-			writer,
-			numberOfBuffersToWrite + 1, // +1 for end-of-partition
-			consumer);
-
-		consumer.setSubpartitionView(view);
-
-		// Consume subpartition
-		consumer.call();
-	}
-
-	@Test
-	public void testConsumeWithFewBuffers() throws Exception {
-		// Config
-		final int numberOfBuffersToWrite = 512;
-
-		// Setup
-		final BufferFileWriter writer = createWriterAndWriteBuffers(numberOfBuffersToWrite);
-
-		writer.close();
-
-		TestSubpartitionConsumer consumer = new TestSubpartitionConsumer(
-			false, new TestConsumerCallback.RecyclingCallback());
-
-		SpilledSubpartitionView view = new SpilledSubpartitionView(
-			mock(SpillableSubpartition.class),
-			32 * 1024,
-			writer,
-			numberOfBuffersToWrite + 1,
-			consumer);
-
-		consumer.setSubpartitionView(view);
-
-		// No buffer available, don't deadlock. We need to make progress in situations when the
view
-		// is consumed at an input gate with local and remote channels. The remote channels might
-		// eat up all the buffers, at which point the spilled view will not have any buffers
-		// available and the input gate can't make any progress if we don't return immediately.
-		//
-		// The current solution is straight-forward with a separate buffer per spilled subpartition,
-		// but introduces memory-overhead.
-		//
-		// TODO Replace with asynchronous buffer pool request as this introduces extra buffers
per
-		// consumed subpartition.
-		consumer.call();
-	}
-
-	@Test
-	public void testReadMultipleFilesWithSingleBufferPool() throws Exception {
-		ExecutorService executor = null;
-		BufferFileWriter[] writers = null;
-		ResultSubpartitionView[] readers = null;
-
-		try {
-			executor = Executors.newCachedThreadPool();
-
-			// Setup
-			writers = new BufferFileWriter[]{
-				createWriterAndWriteBuffers(512),
-				createWriterAndWriteBuffers(512)
-			};
-
-			readers = new ResultSubpartitionView[writers.length];
-			TestSubpartitionConsumer[] consumers = new TestSubpartitionConsumer[writers.length];
-
-			BufferProvider inputBuffers = new TestPooledBufferProvider(2);
-
-			SpillableSubpartition parent = mock(SpillableSubpartition.class);
-
-			// Wait for writers to finish
-			for (BufferFileWriter writer : writers) {
-				writer.close();
-			}
-
-			// Create the views depending on the test configuration
-			for (int i = 0; i < readers.length; i++) {
-				consumers[i] = new TestSubpartitionConsumer(
-					false, new TestConsumerCallback.RecyclingCallback());
-
-				readers[i] = new SpilledSubpartitionView(
-					parent,
-					inputBuffers.getMemorySegmentSize(),
-					writers[i],
-					512 + 1, // +1 for end of partition event
-					consumers[i]);
-
-				consumers[i].setSubpartitionView(readers[i]);
-			}
-
-			final List<Future<Boolean>> results = Lists.newArrayList();
-
-			// Submit the consuming tasks
-			for (TestSubpartitionConsumer consumer : consumers) {
-				results.add(executor.submit(consumer));
-			}
-
-			// Wait for the results
-			for (Future<Boolean> res : results) {
-				try {
-					res.get(2, TimeUnit.MINUTES);
-				} catch (TimeoutException e) {
-					throw new TimeoutException("There has been a timeout in the test. This " +
-						"indicates that there is a bug/deadlock in the tested subpartition " +
-						"view.");
-				}
-			}
-		} finally {
-			if (writers != null) {
-				for (BufferFileWriter writer : writers) {
-					if (writer != null) {
-						writer.deleteChannel();
-					}
-				}
-			}
-
-			if (readers != null) {
-				for (ResultSubpartitionView reader : readers) {
-					if (reader != null) {
-						reader.releaseAllResources();
-					}
-				}
-			}
-
-			if (executor != null) {
-				executor.shutdown();
-			}
-		}
-	}
-
-	/**
-	 * Returns a buffer file writer, to which the specified number of buffer write requests
have
-	 * been issued (including an end of partition event).
-	 *
-	 * <p> Call {@link BufferFileWriter#close()} to ensure that all buffers have been
written.
-	 */
-	private static BufferFileWriter createWriterAndWriteBuffers(int numberOfBuffers) throws
IOException {
-
-		final BufferFileWriter writer = IO_MANAGER.createBufferFileWriter(IO_MANAGER.createChannel());
-
-		for (int i = 0; i < numberOfBuffers; i++) {
-			writer.writeBlock(TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE));
-		}
-
-		writer.writeBlock(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE));
-
-		return writer;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/89605adb/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
index b4bdd3e..2c6ee50 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
@@ -40,6 +40,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * @see TestConsumerCallback
  */
+@Deprecated
 public class TestSubpartitionConsumer implements Callable<Boolean>, BufferAvailabilityListener
{
 
 	private static final int MAX_SLEEP_TIME_MS = 20;


Mime
View raw message