flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [46/50] [abbrv] flink git commit: Rebase fixes
Date Tue, 01 Nov 2016 08:41:13 GMT
Rebase fixes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4c4d8eb1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4c4d8eb1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4c4d8eb1

Branch: refs/heads/flip-6
Commit: 4c4d8eb144f34c0d454ce7b033e29f2ab75f5cea
Parents: 09ef605
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Mon Oct 31 19:01:22 2016 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Nov 1 09:39:35 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/instance/SlotDescriptor.java       | 15 ++++++++-------
 .../org/apache/flink/runtime/instance/SlotPool.java  |  4 ++--
 .../flink/runtime/taskexecutor/TaskExecutor.java     | 11 ++++++-----
 .../org/apache/flink/runtime/taskmanager/Task.java   | 10 +++++-----
 .../flink/runtime/taskmanager/TaskManager.scala      |  9 +--------
 .../flink/runtime/instance/AvailableSlotsTest.java   |  3 ++-
 .../flink/runtime/taskmanager/TaskManagerTest.java   |  3 ++-
 7 files changed, 26 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4c4d8eb1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
index be7cf96..47ce422 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.instance;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -45,20 +46,20 @@ public class SlotDescriptor {
 	private final ResourceProfile resourceProfile;
 
 	/** TEMP until the new RPC is in place: The actor gateway to communicate with the TaskManager
*/
-	private final ActorGateway taskManagerActorGateway;
+	private final TaskManagerGateway taskManagerGateway;
 
 	public SlotDescriptor(
 		final JobID jobID,
 		final TaskManagerLocation location,
 		final int slotNumber,
 		final ResourceProfile resourceProfile,
-		final ActorGateway actorGateway)
+		final TaskManagerGateway taskManagerGateway)
 	{
 		this.jobID = checkNotNull(jobID);
 		this.taskManagerLocation = checkNotNull(location);
 		this.slotNumber = slotNumber;
 		this.resourceProfile = checkNotNull(resourceProfile);
-		this.taskManagerActorGateway = checkNotNull(actorGateway);
+		this.taskManagerGateway = checkNotNull(taskManagerGateway);
 	}
 
 	public SlotDescriptor(final SlotDescriptor other) {
@@ -66,7 +67,7 @@ public class SlotDescriptor {
 		this.taskManagerLocation = other.taskManagerLocation;
 		this.slotNumber = other.slotNumber;
 		this.resourceProfile = other.resourceProfile;
-		this.taskManagerActorGateway = other.taskManagerActorGateway;
+		this.taskManagerGateway = other.taskManagerGateway;
 	}
 	
 	// TODO - temporary workaround until we have the SlotDesriptor in the Slot
@@ -75,7 +76,7 @@ public class SlotDescriptor {
 		this.taskManagerLocation = slot.getTaskManagerLocation();
 		this.slotNumber = slot.getRootSlotNumber();
 		this.resourceProfile = new ResourceProfile(0, 0);
-		this.taskManagerActorGateway = slot.getTaskManagerActorGateway();
+		this.taskManagerGateway = slot.getTaskManagerGateway();
 	}
 
 	/**
@@ -121,8 +122,8 @@ public class SlotDescriptor {
 	 *
 	 * @return The actor gateway that can be used to send messages to the TaskManager.
 	 */
-	public ActorGateway getTaskManagerActorGateway() {
-		return taskManagerActorGateway;
+	public TaskManagerGateway getTaskManagerGateway() {
+		return taskManagerGateway;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/4c4d8eb1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 02166a4..44df29b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -142,7 +142,7 @@ public class SlotPool implements SlotOwner {
 					SimpleSlot slot = new SimpleSlot(
 							descriptor.getJobID(), SlotPool.this,
 							descriptor.getTaskManagerLocation(), descriptor.getSlotNumber(),
-							descriptor.getTaskManagerActorGateway());
+							descriptor.getTaskManagerGateway());
 					synchronized (lock) {
 						// double validation since we are out of the lock protection after the slot is granted
 						if (registeredResources.contains(descriptor.getTaskManagerLocation().getResourceID()))
{
@@ -198,7 +198,7 @@ public class SlotPool implements SlotOwner {
 				public SharedSlot apply(SlotDescriptor descriptor) {
 					SharedSlot slot = new SharedSlot(
 							descriptor.getJobID(), SlotPool.this, descriptor.getTaskManagerLocation(),
-							descriptor.getSlotNumber(), descriptor.getTaskManagerActorGateway(),
+							descriptor.getSlotNumber(), descriptor.getTaskManagerGateway(),
 							sharingGroupAssignment);
 
 					synchronized (lock) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4c4d8eb1/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 5eb8e6a..071e706 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -853,11 +853,12 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway>
{
 					jobMasterLeaderId,
 					jobMasterGateway,
 					new TaskExecutionState(
-							task.getJobID(),
-							task.getExecutionId(),
-							task.getExecutionState(),
-							task.getFailureCause(),
-							accumulatorSnapshot));
+						task.getJobID(),
+						task.getExecutionId(),
+						task.getExecutionState(),
+						task.getFailureCause(),
+						accumulatorSnapshot,
+						task.getMetricGroup().getIOMetricGroup().createSnapshot()));
 		} else {
 			log.error("Cannot find task with ID {} to unregister.", executionAttemptID);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/4c4d8eb1/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 7e9fbfa..eeba532 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -929,7 +929,7 @@ public class Task implements Runnable, TaskActions {
 								taskNameWithSubtask,
 								taskCancellationInterval,
 								taskCancellationTimeout,
-								taskManagerConnection,
+								taskManagerActions,
 								producedPartitions,
 								inputGates);
 						Thread cancelThread = new Thread(executingThread.getThreadGroup(), canceler,
@@ -1229,7 +1229,7 @@ public class Task implements Runnable, TaskActions {
 		private final long interruptTimeout;
 
 		/** TaskManager to notify about a timeout */
-		private final TaskManagerConnection taskManager;
+		private final TaskManagerActions taskManagerActions;
 
 		/** Watch Dog thread */
 		@Nullable
@@ -1242,7 +1242,7 @@ public class Task implements Runnable, TaskActions {
 				String taskName,
 				long cancellationInterval,
 				long cancellationTimeout,
-				TaskManagerConnection taskManager,
+				TaskManagerActions taskManagerActions,
 				ResultPartition[] producedPartitions,
 				SingleInputGate[] inputGates) {
 
@@ -1252,7 +1252,7 @@ public class Task implements Runnable, TaskActions {
 			this.taskName = taskName;
 			this.interruptInterval = cancellationInterval;
 			this.interruptTimeout = cancellationTimeout;
-			this.taskManager = taskManager;
+			this.taskManagerActions = taskManagerActions;
 			this.producedPartitions = producedPartitions;
 			this.inputGates = inputGates;
 
@@ -1367,7 +1367,7 @@ public class Task implements Runnable, TaskActions {
 								duration,
 								bld.toString());
 
-						taskManager.notifyFatalError(msg, null);
+						taskManagerActions.notifyFatalError(msg, null);
 
 						return; // done, don't forget to leave the loop
 					} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/4c4d8eb1/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 5ecfb80..f7c462f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -22,6 +22,7 @@ import java.io.{File, FileInputStream, IOException}
 import java.lang.management.ManagementFactory
 import java.net.{InetAddress, InetSocketAddress}
 import java.util
+import java.util.concurrent.TimeUnit
 import java.util.{Collections, UUID}
 
 import _root_.akka.actor._
@@ -1961,12 +1962,4 @@ object TaskManager {
 
     (protocol, hostname, port)
   }
-
-
-  // --------------------------------------------------------------------------
-  //  Miscellaneous Utilities
-  // --------------------------------------------------------------------------
-
-  /**
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4c4d8eb1/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
index 872810f..8e31085 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.instance;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.junit.Test;
 
@@ -118,6 +119,6 @@ public class AvailableSlotsTest {
 	{
 		TaskManagerLocation location = mock(TaskManagerLocation.class);
 		when(location.getResourceID()).thenReturn(resourceID);
-		return new SlotDescriptor(jobID, location, slotNumber, resourceProfile, mock(ActorGateway.class));
+		return new SlotDescriptor(jobID, location, slotNumber, resourceProfile, mock(TaskManagerGateway.class));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4c4d8eb1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 369e6d1..06add39 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -1429,7 +1429,8 @@ public class TaskManagerTest extends TestLogger {
 				false // don't deploy eagerly but with the first completed memory buffer
 			);
 
-			final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(jid, "TestJob", vid,
eid, executionConfig,
+			final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
+				jid, new AllocationID(), "TestJob", vid, eid, executionConfig,
 				"TestTask", 1, 0, 1, 0, new Configuration(), new Configuration(),
 				TestInvokableRecordCancel.class.getName(),
 				Collections.singletonList(resultPartitionDeploymentDescriptor),


Mime
View raw message