flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [3/3] flink git commit: [FLINK-5040] [taskmanager] Adjust partition request backoffs
Date Fri, 11 Nov 2016 08:45:02 GMT
[FLINK-5040] [taskmanager] Adjust partition request backoffs

The back offs were hard coded before, which would have made it
impossible to react to any potential problems with them.

This closes #2784.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d5637b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d5637b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d5637b0

Branch: refs/heads/master
Commit: 5d5637b01031746b2dfadf6d7fcd59155f7de653
Parents: 2742d5c
Author: Ufuk Celebi <uce@apache.org>
Authored: Thu Nov 10 11:15:47 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Fri Nov 11 09:41:39 2016 +0100

----------------------------------------------------------------------
 .../flink/configuration/TaskManagerOptions.java | 24 ++++--
 .../ResultPartitionDeploymentDescriptor.java    |  8 +-
 .../partition/consumer/SingleInputGate.java     | 10 ++-
 .../apache/flink/runtime/taskmanager/Task.java  |  2 +-
 .../NetworkEnvironmentConfiguration.scala       | 14 ++--
 .../flink/runtime/taskmanager/TaskManager.scala |  9 ++-
 ...ResultPartitionDeploymentDescriptorTest.java |  2 +-
 .../partition/consumer/SingleInputGateTest.java | 84 ++++++++++++++++++++
 ...askManagerComponentsStartupShutdownTest.java |  4 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  5 ++
 10 files changed, 140 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index e5d36aa..6f6238b 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -35,6 +35,20 @@ public class TaskManagerOptions {
 	// @TODO Migrate 'taskmanager.*' config options from ConfigConstants
 
 	// ------------------------------------------------------------------------
+	//  Network Options
+	// ------------------------------------------------------------------------
+
+	/** Minimum backoff for partition requests of input channels. */
+	public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
+			key("taskmanager.net.request-backoff.initial")
+			.defaultValue(100);
+
+	/** Maximum backoff for partition requests of input channels. */
+	public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
+			key("taskmanager.net.request-backoff.max")
+			.defaultValue(10000);
+
+	// ------------------------------------------------------------------------
 	//  Task Options
 	// ------------------------------------------------------------------------
 
@@ -44,8 +58,8 @@ public class TaskManagerOptions {
 	 */
 	public static final ConfigOption<Long> TASK_CANCELLATION_INTERVAL =
 			key("task.cancellation.interval")
-					.defaultValue(30000L)
-					.withDeprecatedKeys("task.cancellation-interval");
+			.defaultValue(30000L)
+			.withDeprecatedKeys("task.cancellation-interval");
 
 	/**
 	 * Timeout in milliseconds after which a task cancellation times out and
@@ -54,19 +68,19 @@ public class TaskManagerOptions {
 	 */
 	public static final ConfigOption<Long> TASK_CANCELLATION_TIMEOUT =
 			key("task.cancellation.timeout")
-					.defaultValue(180000L);
+			.defaultValue(180000L);
 
 	/**
 	 * The maximum number of bytes that a checkpoint alignment may buffer.
 	 * If the checkpoint alignment buffers more than the configured amount of
 	 * data, the checkpoint is aborted (skipped).
-	 * 
+	 *
 	 * <p>The default value of {@code -1} indicates that there is no limit.
 	 */
 	public static final ConfigOption<Long> TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT =
 			key("task.checkpoint.alignment.max-size")
 			.defaultValue(-1L);
-	
+
 	// ------------------------------------------------------------------------
 
 	/** Not intended to be instantiated */

http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index 2ecde80..14c7d2a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -49,7 +49,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable
{
 	private final int numberOfSubpartitions;
 	
 	/** Flag whether the result partition should send scheduleOrUpdateConsumer messages. */
-	private final boolean lazyScheduling;
+	private final boolean sendScheduleOrUpdateConsumersMessage;
 
 	public ResultPartitionDeploymentDescriptor(
 			IntermediateDataSetID resultId,
@@ -64,7 +64,7 @@ public class ResultPartitionDeploymentDescriptor implements Serializable
{
 
 		checkArgument(numberOfSubpartitions >= 1);
 		this.numberOfSubpartitions = numberOfSubpartitions;
-		this.lazyScheduling = lazyScheduling;
+		this.sendScheduleOrUpdateConsumersMessage = lazyScheduling;
 	}
 
 	public IntermediateDataSetID getResultId() {
@@ -83,8 +83,8 @@ public class ResultPartitionDeploymentDescriptor implements Serializable
{
 		return numberOfSubpartitions;
 	}
 
-	public boolean allowLazyScheduling() {
-		return lazyScheduling;
+	public boolean sendScheduleOrUpdateConsumersMessage() {
+		return sendScheduleOrUpdateConsumersMessage;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index af5fd89..8f57542 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import com.google.common.collect.Maps;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.taskmanager.TaskActions;
@@ -520,6 +521,13 @@ public class SingleInputGate implements InputGate {
 
 	// ------------------------------------------------------------------------
 
+	@VisibleForTesting
+	Map<IntermediateResultPartitionID, InputChannel> getInputChannels() {
+		return inputChannels;
+	}
+
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Creates an input gate and all of its input channels.
 	 */
@@ -565,7 +573,7 @@ public class SingleInputGate implements InputGate {
 					partitionLocation.getConnectionId(),
 					networkEnvironment.getConnectionManager(),
 					networkEnvironment.getPartitionRequestInitialBackoff(),
-					networkEnvironment.getPartitionRequestInitialBackoff(),
+					networkEnvironment.getPartitionRequestMaxBackoff(),
 					metrics
 				);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 4f3dd54..b960e68 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -346,7 +346,7 @@ public class Task implements Runnable, TaskActions {
 				resultPartitionConsumableNotifier,
 				ioManager,
 				networkEnvironment.getDefaultIOMode(),
-				desc.allowLazyScheduling());
+				desc.sendScheduleOrUpdateConsumersMessage());
 
 			writers[counter] = new ResultPartitionWriter(producedPartitions[counter]);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index 14589a1..6a59665 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -23,10 +23,10 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 
 case class NetworkEnvironmentConfiguration(
-  numNetworkBuffers: Int,
-  networkBufferSize: Int,
-  memoryType: MemoryType,
-  ioMode: IOMode,
-  nettyConfig: Option[NettyConfig] = None,
-  partitionRequestInitialBackoff: Int = 500,
-  partitinRequestMaxBackoff: Int = 3000)
+    numNetworkBuffers: Int,
+    networkBufferSize: Int,
+    memoryType: MemoryType,
+    ioMode: IOMode,
+    partitionRequestInitialBackoff : Int,
+    partitionRequestMaxBackoff : Int,
+    nettyConfig: Option[NettyConfig] = None)

http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 4bb2da4..dd5d218 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1982,7 +1982,7 @@ object TaskManager {
       kvStateServer,
       netConfig.ioMode,
       netConfig.partitionRequestInitialBackoff,
-      netConfig.partitinRequestMaxBackoff)
+      netConfig.partitionRequestMaxBackoff)
 
     network.start()
 
@@ -2258,11 +2258,18 @@ object TaskManager {
 
     val ioMode : IOMode = if (syncOrAsync == "async") IOMode.ASYNC else IOMode.SYNC
 
+    val initialRequestBackoff = configuration.getInteger(
+      TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL)
+    val maxRequestBackoff = configuration.getInteger(
+      TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX)
+
     val networkConfig = NetworkEnvironmentConfiguration(
       numNetworkBuffers,
       pageSize,
       memType,
       ioMode,
+      initialRequestBackoff,
+      maxRequestBackoff,
       nettyConfig)
 
     // ----> timeouts, library caching, profiling

http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 4223b49..3ed8236 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -55,6 +55,6 @@ public class ResultPartitionDeploymentDescriptorTest {
 		assertEquals(partitionId, copy.getPartitionId());
 		assertEquals(partitionType, copy.getPartitionType());
 		assertEquals(numberOfSubpartitions, copy.getNumberOfSubpartitions());
-		assertTrue(copy.allowLazyScheduling());
+		assertTrue(copy.sendScheduleOrUpdateConsumersMessage());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 8f9ea9e..0b7b10d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -21,11 +21,14 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -42,9 +45,12 @@ import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -272,6 +278,84 @@ public class SingleInputGateTest {
 	}
 
 	/**
+	 * Tests request back off configuration is correctly forwarded to the channels.
+	 */
+	@Test
+	public void testRequestBackoffConfiguration() throws Exception {
+		ResultPartitionID[] partitionIds = new ResultPartitionID[] {
+			new ResultPartitionID(),
+			new ResultPartitionID(),
+			new ResultPartitionID()
+		};
+
+		InputChannelDeploymentDescriptor[] channelDescs = new InputChannelDeploymentDescriptor[]{
+			// Local
+			new InputChannelDeploymentDescriptor(
+				partitionIds[0],
+				ResultPartitionLocation.createLocal()),
+			// Remote
+			new InputChannelDeploymentDescriptor(
+				partitionIds[1],
+				ResultPartitionLocation.createRemote(new ConnectionID(new InetSocketAddress("localhost",
5000), 0))),
+			// Unknown
+			new InputChannelDeploymentDescriptor(
+				partitionIds[2],
+				ResultPartitionLocation.createUnknown())};
+
+		InputGateDeploymentDescriptor gateDesc = new InputGateDeploymentDescriptor(new IntermediateDataSetID(),
0, channelDescs);
+
+		int initialBackoff = 137;
+		int maxBackoff = 1001;
+
+		NetworkEnvironment netEnv = mock(NetworkEnvironment.class);
+		when(netEnv.getResultPartitionManager()).thenReturn(new ResultPartitionManager());
+		when(netEnv.getTaskEventDispatcher()).thenReturn(new TaskEventDispatcher());
+		when(netEnv.getPartitionRequestInitialBackoff()).thenReturn(initialBackoff);
+		when(netEnv.getPartitionRequestMaxBackoff()).thenReturn(maxBackoff);
+		when(netEnv.getConnectionManager()).thenReturn(new LocalConnectionManager());
+
+		SingleInputGate gate = SingleInputGate.create(
+			"TestTask",
+			new JobID(),
+			new ExecutionAttemptID(),
+			gateDesc,
+			netEnv,
+			mock(TaskActions.class),
+			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+
+		Map<IntermediateResultPartitionID, InputChannel> channelMap = gate.getInputChannels();
+
+		assertEquals(3, channelMap.size());
+		InputChannel localChannel = channelMap.get(partitionIds[0].getPartitionId());
+		assertEquals(LocalInputChannel.class, localChannel.getClass());
+
+		InputChannel remoteChannel = channelMap.get(partitionIds[1].getPartitionId());
+		assertEquals(RemoteInputChannel.class, remoteChannel.getClass());
+
+		InputChannel unknownChannel = channelMap.get(partitionIds[2].getPartitionId());
+		assertEquals(UnknownInputChannel.class, unknownChannel.getClass());
+
+		InputChannel[] channels = new InputChannel[]{localChannel, remoteChannel, unknownChannel};
+		for (InputChannel ch : channels) {
+			assertEquals(0, ch.getCurrentBackoff());
+
+			assertTrue(ch.increaseBackoff());
+			assertEquals(initialBackoff, ch.getCurrentBackoff());
+
+			assertTrue(ch.increaseBackoff());
+			assertEquals(initialBackoff * 2, ch.getCurrentBackoff());
+
+			assertTrue(ch.increaseBackoff());
+			assertEquals(initialBackoff * 2 * 2, ch.getCurrentBackoff());
+
+			assertTrue(ch.increaseBackoff());
+			assertEquals(maxBackoff, ch.getCurrentBackoff());
+
+			assertFalse(ch.increaseBackoff());
+		}
+	}
+
+	/**
 	 * Returns whether the stack trace represents a Thread in a blocking queue
 	 * poll call.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 0bcd1ce..f9434e2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -104,7 +104,7 @@ public class TaskManagerComponentsStartupShutdownTest {
 					config);
 
 			final NetworkEnvironmentConfiguration netConf = new NetworkEnvironmentConfiguration(
-					32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, Option.<NettyConfig>empty(),
0, 0);
+					32, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, Option.<NettyConfig>empty());
 
 			ResourceID taskManagerId = ResourceID.generate();
 			
@@ -121,7 +121,7 @@ public class TaskManagerComponentsStartupShutdownTest {
 				null,
 				netConf.ioMode(),
 				netConf.partitionRequestInitialBackoff(),
-				netConf.partitinRequestMaxBackoff());
+				netConf.partitionRequestMaxBackoff());
 
 			network.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5d5637b0/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 22f0c60..fd9ff05 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -903,6 +904,8 @@ public class TaskManagerTest extends TestLogger {
 				final int dataPort = NetUtils.getAvailablePort();
 				Configuration config = new Configuration();
 				config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
+				config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+				config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
 
 				taskManager = TestingUtils.createTaskManager(
 						system,
@@ -998,6 +1001,8 @@ public class TaskManagerTest extends TestLogger {
 				jobManager = new AkkaActorGateway(jm, leaderSessionID);
 
 				final Configuration config = new Configuration();
+				config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+				config.setInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
 
 				taskManager = TestingUtils.createTaskManager(
 						system,


Mime
View raw message