flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [1/2] git commit: [FLINK-1170] Fix faulty input split localization Pass hostname to split assigner Avoid clashes by only using the first component of the fully qualified hostname
Date Sat, 18 Oct 2014 15:13:46 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master ba44459f6 -> 23e30f09d


[FLINK-1170] Fix faulty input split localization
Pass hostname to split assigner
Avoid clashes by only using the first component of the fully qualified hostname


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b6f599f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b6f599f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b6f599f1

Branch: refs/heads/master
Commit: b6f599f1ed27a28ee0f8be7176f06a5fa43fa310
Parents: ba44459
Author: Robert Metzger <rmetzger@apache.org>
Authored: Fri Oct 17 10:58:15 2014 +0200
Committer: Robert Metzger <metzgerr@web.de>
Committed: Sat Oct 18 17:13:06 2014 +0200

----------------------------------------------------------------------
 .../common/io/LocatableInputSplitAssigner.java  | 12 +++---
 .../flink/util/MutableObjectIterator.java       |  2 -
 .../java/org/apache/flink/util/NetUtils.java    | 36 ++++++++++++++++
 .../instance/InstanceConnectionInfo.java        | 44 +++++++++++++++-----
 .../runtime/io/network/ChannelManager.java      |  2 +-
 .../flink/runtime/jobmanager/JobManager.java    | 18 +++++++-
 .../jobmanager/web/JobmanagerInfoServlet.java   |  2 +-
 .../runtime/jobmanager/web/JsonFactory.java     |  2 +-
 .../org/apache/flink/runtime/net/NetUtils.java  |  2 -
 .../protocols/InputSplitProviderProtocol.java   |  3 +-
 .../taskmanager/TaskInputSplitProvider.java     |  8 +++-
 .../flink/runtime/taskmanager/TaskManager.java  |  2 +-
 .../instance/InstanceConnectionInfoTest.java    | 37 ++++++++++++++--
 13 files changed, 137 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6f599f1/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
index fb01562..4750425 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/LocatableInputSplitAssigner.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.LocatableInputSplit;
+import org.apache.flink.util.NetUtils;
 
 /**
  * The locatable input split assigner assigns to each host splits that are local, before
assigning
@@ -81,7 +82,7 @@ public final class LocatableInputSplitAssigner implements InputSplitAssigner
{
 					return next;
 				} else {
 					if (LOG.isDebugEnabled()) {
-						LOG.debug("No more input splits remaining.");
+						LOG.debug("No more unassigned input splits remaining.");
 					}
 					return null;
 				}
@@ -126,6 +127,7 @@ public final class LocatableInputSplitAssigner implements InputSplitAssigner
{
 			}
 		}
 		
+		
 		// at this point, we have a list of local splits (possibly empty)
 		// we need to make sure no one else operates in the current list (that protects against
 		// list creation races) and that the unassigned set is consistent
@@ -173,13 +175,13 @@ public final class LocatableInputSplitAssigner implements InputSplitAssigner
{
 		}
 	}
 	
-	private static final boolean isLocal(String host, String[] hosts) {
-		if (host == null || hosts == null) {
+	private static final boolean isLocal(String flinkHost, String[] hosts) {
+		if (flinkHost == null || hosts == null) {
 			return false;
 		}
-		
 		for (String h : hosts) {
-			if (h != null && host.equals(h.toLowerCase())) {
+			final String hadoopHost = NetUtils.getHostnameFromFQDN(h.toLowerCase());
+			if (h != null && hadoopHost.toLowerCase().equals(flinkHost)) {
 				return true;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6f599f1/flink-core/src/main/java/org/apache/flink/util/MutableObjectIterator.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/MutableObjectIterator.java b/flink-core/src/main/java/org/apache/flink/util/MutableObjectIterator.java
index 017b10a..b7b41d4 100644
--- a/flink-core/src/main/java/org/apache/flink/util/MutableObjectIterator.java
+++ b/flink-core/src/main/java/org/apache/flink/util/MutableObjectIterator.java
@@ -15,8 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-
 package org.apache.flink.util;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6f599f1/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
new file mode 100644
index 0000000..ead61ba
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.util;
+
+
+public class NetUtils {
+	/**
+	 * Turn a fully qualified domain name (fqdn) into a hostname.
+	 * 
+	 * @param fqdn
+	 * @return
+	 */
+	public static String getHostnameFromFQDN(String fqdn) {
+		int dotPos = fqdn.indexOf('.');
+		if(dotPos == -1) {
+			return fqdn;
+		} else {
+			return fqdn.substring(0, dotPos);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6f599f1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
index 360bd5f..bb4171f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
@@ -25,7 +25,10 @@ import java.net.UnknownHostException;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class encapsulates all connection information necessary to connect to the instance's
task manager.
@@ -33,6 +36,9 @@ import org.apache.flink.util.StringUtils;
 public class InstanceConnectionInfo implements IOReadableWritable, Comparable<InstanceConnectionInfo>,
java.io.Serializable {
 
 	private static final long serialVersionUID = -8254407801276350716L;
+	
+	private static final Logger LOG = LoggerFactory.getLogger(InstanceConnectionInfo.class);
+	
 
 	/**
 	 * The network address the instance's task manager binds its sockets to.
@@ -50,9 +56,14 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 	private int dataPort;
 
 	/**
-	 * The host name of the instance.
+	 * The fully qualified host name of the instance.
+	 */
+	private String fqdnHostName;
+	
+	/**
+	 * The hostname
 	 */
-	private String hostName;
+	private String hostname;
 
 
 	/**
@@ -123,19 +134,30 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 	 * 
 	 * @return the host name of the instance
 	 */
-	public String hostname() {
-		if (this.hostName == null) {
+	public String getFQDNHostname() {
+		if (this.fqdnHostName == null) {
 			try {
-				this.hostName = this.inetAddress.getCanonicalHostName();
+				this.fqdnHostName = this.inetAddress.getCanonicalHostName();
 			} catch (Throwable t) {
+				LOG.warn("Unable to determine hostname for TaskManager. The performance might be degraded
since HDFS input split assignment is not possible");
+				if(LOG.isDebugEnabled()) {
+					LOG.debug("getCanonicalHostName() Exception", t);
+				}
 				// could not determine host name, so take IP textual representation
-				this.hostName = inetAddress.getHostAddress();
+				this.fqdnHostName = inetAddress.getHostAddress();
 			}
 		}
-		return this.hostName;
+		return this.fqdnHostName;
 	}
-
 	
+	public String getHostname() {
+		if(hostname == null) {
+			String fqdn = getFQDNHostname();
+			hostname = NetUtils.getHostnameFromFQDN(fqdn);
+		}
+		return hostname;
+	}
+
 	public String getInetAdress() {
 		return this.inetAddress.toString();
 	}
@@ -154,7 +176,7 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 		this.ipcPort = in.readInt();
 		this.dataPort = in.readInt();
 		
-		this.hostName = StringUtils.readNullableString(in);
+		this.fqdnHostName = StringUtils.readNullableString(in);
 
 		try {
 			this.inetAddress = InetAddress.getByAddress(address);
@@ -172,7 +194,7 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 		out.writeInt(this.ipcPort);
 		out.writeInt(this.dataPort);
 		
-		StringUtils.writeNullableString(hostName, out);
+		StringUtils.writeNullableString(fqdnHostName, out);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -181,7 +203,7 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
 
 	@Override
 	public String toString() {
-		return hostname() + " (ipcPort=" + ipcPort + ", dataPort=" + dataPort + ")";
+		return getFQDNHostname() + " (ipcPort=" + ipcPort + ", dataPort=" + dataPort + ")";
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6f599f1/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
index 59084e9..8f91814 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
@@ -259,7 +259,7 @@ public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker
 		// need at least one buffer per channel
 		if (numChannels > 0 && numBuffers / numChannels < 1) {
 			String msg = String.format("%s has not enough buffers to safely execute %s (%d buffers
missing)",
-					this.connectionInfo.hostname(), env.getTaskName(), numChannels - numBuffers);
+					this.connectionInfo.getFQDNHostname(), env.getTaskName(), numChannels - numBuffers);
 
 			throw new InsufficientResourcesException(msg);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6f599f1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 910eba3..c93eee3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -58,9 +58,12 @@ import org.apache.flink.runtime.client.JobSubmissionResult;
 import org.apache.flink.runtime.event.job.AbstractEvent;
 import org.apache.flink.runtime.event.job.RecentJobEvent;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Hardware;
 import org.apache.flink.runtime.instance.HardwareDescription;
 import org.apache.flink.runtime.instance.Instance;
@@ -453,7 +456,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 	}
 	
 	@Override
-	public InputSplit requestNextInputSplit(JobID jobID, JobVertexID vertexId) throws IOException
{
+	public InputSplit requestNextInputSplit(JobID jobID, JobVertexID vertexId, ExecutionAttemptID
executionAttempt) throws IOException {
 
 		final ExecutionGraph graph = this.currentJobs.get(jobID);
 		if (graph == null) {
@@ -473,8 +476,19 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 			return null;
 		}
 		
+		// get hostname for input split assignment
+		String host = null;
+		Execution execution = graph.getRegisteredExecutions().get(executionAttempt);
+		if(execution == null) {
+			LOG.error("Can not find Execution for attempt " + executionAttempt);
+		} else {
+			AllocatedSlot slot = execution.getAssignedResource();
+			if(slot != null) {
+				host = slot.getInstance().getInstanceConnectionInfo().getHostname();
+			}
+		}
 		
-		return splitAssigner.getNextInputSplit(null);
+		return splitAssigner.getNextInputSplit(host);
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6f599f1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
index e63adbd..ef5a246 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
@@ -260,7 +260,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
 								wrt.write(",");
 							}
 							wrt.write("{");
-							wrt.write("\"node\": \"" + (slot == null ? "(none)" : slot.getInstance().getInstanceConnectionInfo().hostname())
+ "\",");
+							wrt.write("\"node\": \"" + (slot == null ? "(none)" : slot.getInstance().getInstanceConnectionInfo().getFQDNHostname())
+ "\",");
 							wrt.write("\"message\": \"" + (failureCause == null ? "" : StringUtils.escapeHtml(ExceptionUtils.stringifyException(failureCause)))
+ "\"");
 							wrt.write("}");
 						}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6f599f1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
index 0022660..e4440c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JsonFactory.java
@@ -40,7 +40,7 @@ public class JsonFactory {
 		json.append("\"vertexstatus\": \"" + vertex.getExecutionState() + "\",");
 		
 		AllocatedSlot slot = vertex.getCurrentAssignedResource();
-		String instanceName = slot == null ? "(null)" : slot.getInstance().getInstanceConnectionInfo().hostname();
+		String instanceName = slot == null ? "(null)" : slot.getInstance().getInstanceConnectionInfo().getFQDNHostname();
 		
 		json.append("\"vertexinstancename\": \"" + instanceName + "\"");
 		json.append("}");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6f599f1/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
index 5b08d45..1d7af65 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
@@ -226,7 +226,6 @@ public class NetUtils {
 	 * @return InputStream for reading from the socket.
 	 * @throws IOException
 	 */
-	@SuppressWarnings("resource")
 	public static InputStream getInputStream(Socket socket, long timeout) throws IOException
{
 		return (socket.getChannel() == null) ? socket.getInputStream() : new SocketInputStream(socket,
timeout);
 	}
@@ -272,7 +271,6 @@ public class NetUtils {
 	 * @return OutputStream for writing to the socket.
 	 * @throws IOException
 	 */
-	@SuppressWarnings("resource")
 	public static OutputStream getOutputStream(Socket socket, long timeout) throws IOException
{
 		return (socket.getChannel() == null) ? socket.getOutputStream() : new SocketOutputStream(socket,
timeout);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6f599f1/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/InputSplitProviderProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/InputSplitProviderProtocol.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/InputSplitProviderProtocol.java
index c90cd02..be1846a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/InputSplitProviderProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/InputSplitProviderProtocol.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.protocols.VersionedProtocol;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
@@ -30,5 +31,5 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
  */
 public interface InputSplitProviderProtocol extends VersionedProtocol {
 
-	InputSplit requestNextInputSplit(JobID jobID, JobVertexID vertex) throws IOException;
+	InputSplit requestNextInputSplit(JobID jobID, JobVertexID vertex, ExecutionAttemptID executionAttempt)
throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6f599f1/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
index 7d87aa9..d4e1b7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskmanager;
 import java.io.IOException;
 
 import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -34,16 +35,19 @@ public class TaskInputSplitProvider implements InputSplitProvider {
 	
 	private final JobVertexID vertexId;
 	
-	public TaskInputSplitProvider(InputSplitProviderProtocol protocol, JobID jobId, JobVertexID
vertexId) {
+	private final ExecutionAttemptID executionAttempt;
+	
+	public TaskInputSplitProvider(InputSplitProviderProtocol protocol, JobID jobId, JobVertexID
vertexId, ExecutionAttemptID executionAttempt) {
 		this.protocol = protocol;
 		this.jobId = jobId;
 		this.vertexId = vertexId;
+		this.executionAttempt = executionAttempt;
 	}
 
 	@Override
 	public InputSplit getNextInputSplit() {
 		try {
-			return protocol.requestNextInputSplit(jobId, vertexId);
+			return protocol.requestNextInputSplit(jobId, vertexId, executionAttempt);
 		}
 		catch (IOException e) {
 			throw new RuntimeException("Requesting the next InputSplit failed.", e);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6f599f1/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index 1498a1c..f27f311 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -590,7 +590,7 @@ public class TaskManager implements TaskOperationProtocol {
 				throw new Exception("TaskManager contains already a task with executionId " + executionId);
 			}
 			
-			final InputSplitProvider splitProvider = new TaskInputSplitProvider(this.globalInputSplitProvider,
jobID, vertexId);
+			final InputSplitProvider splitProvider = new TaskInputSplitProvider(this.globalInputSplitProvider,
jobID, vertexId, executionId);
 			final RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, userCodeClassLoader,
this.memoryManager, this.ioManager, splitProvider, this.accumulatorProtocolProxy);
 			task.setEnvironment(env);
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b6f599f1/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
index 2eeddee..17dedd3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceConnectionInfoTest.java
@@ -25,7 +25,10 @@ import static org.junit.Assert.fail;
 import java.net.InetAddress;
 
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.junit.Assert;
 import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.reflect.Whitebox;
 
 public class InstanceConnectionInfoTest {
 
@@ -79,7 +82,7 @@ public class InstanceConnectionInfoTest {
 			// with resolved hostname
 			{
 				InstanceConnectionInfo original = new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"),
10523, 19871);
-				original.hostname();
+				original.getFQDNHostname();
 				
 				InstanceConnectionInfo copy = CommonTestUtils.createCopyWritable(original);
 				assertEquals(original, copy);
@@ -95,17 +98,43 @@ public class InstanceConnectionInfoTest {
 	}
 	
 	@Test
-	public void testGetHostname() {
+	public void testGetFQDNHostname() {
 		try {
 			InstanceConnectionInfo info1 = new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"),
10523, 19871);
-			assertTrue(info1.hostname() != null);
+			assertTrue(info1.getFQDNHostname() != null);
 			
 			InstanceConnectionInfo info2 = new InstanceConnectionInfo(InetAddress.getByName("1.2.3.4"),
9999, 8888);
-			assertTrue(info2.hostname() != null);
+			assertTrue(info2.getFQDNHostname() != null);
 		}
 		catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
 		}
 	}
+	
+	@Test
+	public void testGetHostname0() {
+		try {
+			final InstanceConnectionInfo info1 = PowerMockito.spy(new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"),
10523, 19871));
+			Whitebox.setInternalState(info1, "fqdnHostName", "worker2.cluster.mycompany.com");
+			Assert.assertEquals("worker2", info1.getHostname());
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testGetHostname1() {
+		try {
+			final InstanceConnectionInfo info1 = PowerMockito.spy(new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"),
10523, 19871));
+			Whitebox.setInternalState(info1, "fqdnHostName", "worker10");
+			Assert.assertEquals("worker10", info1.getHostname());
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	
 }


Mime
View raw message