flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/3] flink git commit: [FLINK-2766] [core] Add proper handling of IPv6 address literals in URLs
Date Thu, 01 Oct 2015 07:16:55 GMT
[FLINK-2766] [core] Add proper handling of IPv6 address literals in URLs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bfde1b73
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bfde1b73
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bfde1b73

Branch: refs/heads/master
Commit: bfde1b7379f0a0aef21c836e5ac0842474c7f98f
Parents: a3c0b44
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Sep 29 17:45:06 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Oct 1 08:59:56 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/util/NetUtils.java    |  80 +++++++++
 .../org/apache/flink/util/NetUtilsTest.java     |  97 +++++++++++
 .../apache/flink/runtime/client/JobClient.java  |   7 +-
 .../flink/runtime/net/ConnectionUtils.java      |   2 +-
 .../apache/flink/runtime/akka/AkkaUtils.scala   |  42 +++--
 .../flink/runtime/jobmanager/JobManager.scala   |  20 +--
 .../runtime/minicluster/FlinkMiniCluster.scala  |   6 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   7 +-
 .../TaskManagerProcessReapingTest.java          |   9 +-
 .../flink/runtime/akka/AkkaUtilsTest.scala      |  46 +++++-
 .../fs/RollingSinkFaultTolerance2ITCase.java    |   9 +-
 .../fs/RollingSinkFaultToleranceITCase.java     |   9 +-
 .../connectors/fs/RollingSinkITCase.java        |   5 +-
 .../connectors/kafka/KafkaConsumerTestBase.java |  27 ++-
 .../connectors/kafka/KafkaTestBase.java         |   3 +-
 .../test/util/ForkableFlinkMiniCluster.scala    |   2 +-
 .../flink/test/runtime/IPv6HostnamesITCase.java | 165 +++++++++++++++++++
 17 files changed, 480 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index 03721c5..da445ec 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -15,13 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.util;
 
+import com.google.common.net.InetAddresses;
 
 import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.ServerSocket;
 import java.net.URL;
+import java.net.UnknownHostException;
 
 public class NetUtils {
 	
@@ -68,6 +75,10 @@ public class NetUtils {
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  Lookup of to free ports
+	// ------------------------------------------------------------------------
+	
 	/**
 	 * Find a non-occupied port.
 	 *
@@ -86,4 +97,73 @@ public class NetUtils {
 
 		throw new RuntimeException("Could not find a free permitted port on the machine.");
 	}
+	
+
+	// ------------------------------------------------------------------------
+	//  Encoding of IP addresses for URLs
+	// ------------------------------------------------------------------------
+	
+	/**
+	 * Encodes an IP address properly as a URL string. This method makes sure that IPv6 addresses
+	 * have the proper formatting to be included in URLs.
+	 * <p>
+	 * This method internally uses Guava's functionality to properly encode IPv6 addresses.
+	 * 
+	 * @param address The IP address to encode.
+	 * @return The proper URL string encoded IP address.
+	 */
+	public static String ipAddressToUrlString(InetAddress address) {
+		if (address == null) {
+			throw new NullPointerException("address is null");
+		}
+		else if (address instanceof Inet4Address) {
+			return address.getHostAddress();
+		}
+		else if (address instanceof Inet6Address) {
+			return '[' + InetAddresses.toAddrString(address) + ']';
+		}
+		else {
+			throw new IllegalArgumentException("Unrecognized type of InetAddress: " + address);
+		}
+	}
+
+	/**
+	 * Encodes an IP address and port to be included in URL. in particular, this method makes
+	 * sure that IPv6 addresses have the proper formatting to be included in URLs.
+	 * 
+	 * @param address The address to be included in the URL.
+	 * @param port The port for the URL address.
+	 * @return The proper URL string encoded IP address and port.
+	 */
+	public static String ipAddressAndPortToUrlString(InetAddress address, int port) {
+		return ipAddressToUrlString(address) + ':' + port;
+	}
+
+	/**
+	 * Encodes an IP address and port to be included in URL. in particular, this method makes
+	 * sure that IPv6 addresses have the proper formatting to be included in URLs.
+	 * 
+	 * @param address The socket address with the IP address and port.
+	 * @return The proper URL string encoded IP address and port.
+	 */
+	public static String socketAddressToUrlString(InetSocketAddress address) {
+		if (address.isUnresolved()) {
+			throw new IllegalArgumentException("Address cannot be resolved: " + address.getHostString());
+		}
+		return ipAddressAndPortToUrlString(address.getAddress(), address.getPort());
+	}
+
+	/**
+	 * Normalizes and encodes a hostname and port to be included in URL. 
+	 * In particular, this method makes sure that IPv6 address literals have the proper
+	 * formatting to be included in URLs.
+	 *
+	 * @param host The address to be included in the URL.
+	 * @param port The port for the URL address.
+	 * @return The proper URL string encoded IP address and port.
+	 * @throws java.net.UnknownHostException Thrown, if the hostname cannot be translated into
a URL.
+	 */
+	public static String hostAndPortToUrlString(String host, int port) throws UnknownHostException
{
+		return ipAddressAndPortToUrlString(InetAddress.getByName(host), port);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
new file mode 100644
index 0000000..cd2c13b
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import static org.junit.Assert.*;
+
+public class NetUtilsTest {
+
+	@Test
+	public void testIPv4toURL() {
+		try {
+			final String addressString = "192.168.0.1";
+
+			InetAddress address = InetAddress.getByName(addressString);
+			assertEquals(addressString, NetUtils.ipAddressToUrlString(address));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testIPv6toURL() {
+		try {
+			final String addressString = "2001:01db8:00:0:00:ff00:42:8329";
+			final String normalizedAddress = "[2001:1db8::ff00:42:8329]";
+
+			InetAddress address = InetAddress.getByName(addressString);
+			assertEquals(normalizedAddress, NetUtils.ipAddressToUrlString(address));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testIPv4URLEncoding() {
+		try {
+			final String addressString = "10.244.243.12";
+			final int port = 23453;
+			
+			InetAddress address = InetAddress.getByName(addressString);
+			InetSocketAddress socketAddress = new InetSocketAddress(address, port);
+			
+			assertEquals(addressString, NetUtils.ipAddressToUrlString(address));
+			assertEquals(addressString + ':' + port, NetUtils.ipAddressAndPortToUrlString(address,
port));
+			assertEquals(addressString + ':' + port, NetUtils.socketAddressToUrlString(socketAddress));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testIPv6URLEncoding() {
+		try {
+			final String addressString = "2001:db8:10:11:12:ff00:42:8329";
+			final String bracketedAddressString = '[' + addressString + ']';
+			final int port = 23453;
+
+			InetAddress address = InetAddress.getByName(addressString);
+			InetSocketAddress socketAddress = new InetSocketAddress(address, port);
+
+			assertEquals(bracketedAddressString, NetUtils.ipAddressToUrlString(address));
+			assertEquals(bracketedAddressString + ':' + port, NetUtils.ipAddressAndPortToUrlString(address,
port));
+			assertEquals(bracketedAddressString + ':' + port, NetUtils.socketAddressToUrlString(socketAddress));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index c51bc7c..a436881 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.util.SerializedThrowable;
 
+import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -74,9 +75,11 @@ public class JobClient {
 		ActorSystem system = AkkaUtils.createActorSystem(config, remoting);
 		Address address = system.provider().getDefaultAddress();
 
-		String host = address.host().isDefined() ? address.host().get() : "(unknown)";
+		String hostAddress = address.host().isDefined() ?
+				NetUtils.ipAddressToUrlString(InetAddress.getByName(address.host().get())) :
+				"(unknown)";
 		int port = address.port().isDefined() ? ((Integer) address.port().get()) : -1;
-		LOG.info("Started JobClient actor system at " + host + ':' + port);
+		LOG.info("Started JobClient actor system at " + hostAddress + ':' + port);
 
 		return system;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
index 0ed9345..542e69e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
@@ -365,7 +365,7 @@ public class ConnectionUtils {
 
 						boolean logging = elapsedTime >= startLoggingAfter.toMillis();
 						if (logging) {
-							LOG.info("Trying to connect to address {}." + targetAddress);
+							LOG.info("Trying to connect to address {}", targetAddress);
 						}
 
 						do {

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 8007ef6..bf679c9 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -19,13 +19,14 @@
 package org.apache.flink.runtime.akka
 
 import java.io.IOException
-import java.net.{InetSocketAddress, InetAddress}
+import java.net._
 import java.util.concurrent.{TimeUnit, Callable}
 
 import akka.actor._
 import akka.pattern.{ask => akkaAsk}
 import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
+import org.apache.flink.util.NetUtils
 import org.jboss.netty.logging.{Slf4JLoggerFactory, InternalLoggerFactory}
 import org.slf4j.LoggerFactory
 import scala.concurrent._
@@ -102,6 +103,7 @@ object AkkaUtils {
    *                         then an Akka config for local actor system will be returned
    * @return Akka config
    */
+  @throws(classOf[UnknownHostException])
   def getAkkaConfig(configuration: Configuration,
                     listeningAddress: Option[(String, Int)]): Config = {
     val defaultConfig = getBasicAkkaConfig(configuration)
@@ -109,8 +111,9 @@ object AkkaUtils {
     listeningAddress match {
 
       case Some((hostname, port)) =>
-        val ipAddress = "\"" + InetAddress.getByName(hostname).getHostAddress() + "\""
-        val remoteConfig = getRemoteAkkaConfig(configuration, ipAddress, port)
+        val ipAddress = InetAddress.getByName(hostname)
+        val hostString = "\"" + NetUtils.ipAddressToUrlString(ipAddress) + "\""
+        val remoteConfig = getRemoteAkkaConfig(configuration, hostString, port)
         remoteConfig.withFallback(defaultConfig)
 
       case None =>
@@ -513,23 +516,30 @@ object AkkaUtils {
     * the Akka URL does not contain the hostname and port information, e.g. a local Akka
URL is
     * provided, then an [[Exception]] is thrown.
     *
-    * @param akkaURL
-    * @throws java.lang.Exception
-    * @return
+    * @param akkaURL The URL to extract the host and port from.
+    * @throws java.lang.Exception Thrown, if the given string does not represent a proper
url
+    * @return The InetSocketAddress with teh extracted host and port.
     */
   @throws(classOf[Exception])
   def getInetSockeAddressFromAkkaURL(akkaURL: String): InetSocketAddress = {
     // AkkaURLs have the form schema://systemName@host:port/.... if it's a remote Akka URL
-    val hostPortRegex = """@([^/:]*):(\d*)""".r
-
-    hostPortRegex.findFirstMatchIn(akkaURL) match {
-      case Some(m) =>
-        val host = m.group(1)
-        val port = m.group(2).toInt
-
-        new InetSocketAddress(host, port)
-      case None => throw new Exception("Could not retrieve InetSocketAddress from " +
-        s"Akka URL $akkaURL")
+    try {
+      // we need to manually strip the protocol, because "akka.tcp" is not
+      // a valid protocol for Java's URL class
+      val protocolonPos = akkaURL.indexOf("://")
+      if (protocolonPos == -1 || protocolonPos >= akkaURL.length - 4) {
+        throw new MalformedURLException()
+      }
+      
+      val url = new URL("http://" + akkaURL.substring(protocolonPos + 3))
+      if (url.getHost == null || url.getPort == -1) {
+        throw new MalformedURLException()
+      }
+      new InetSocketAddress(url.getHost, url.getPort)
+    }
+    catch {
+      case _ : MalformedURLException =>
+        throw new Exception(s"Could not retrieve InetSocketAddress from Akka URL $akkaURL")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 07a5977..b7f76ce 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -44,12 +44,8 @@ import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
 import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, UpdateTaskExecutionState}
-
-import org.apache.flink.runtime.messages.accumulators.{AccumulatorResultsErroneous,
-AccumulatorResultsFound, RequestAccumulatorResults, AccumulatorMessage,
-AccumulatorResultStringsFound, RequestAccumulatorResultsStringified}
-import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage,
-AcknowledgeCheckpoint}
+import org.apache.flink.runtime.messages.accumulators._
+import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint}
 import org.apache.flink.runtime.messages.webmonitor._
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
@@ -67,7 +63,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkSchedule
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, Heartbeat}
-import org.apache.flink.util.{SerializedValue, ExceptionUtils, InstantiationUtil}
+import org.apache.flink.util.{NetUtils, SerializedValue, ExceptionUtils, InstantiationUtil}
 
 import scala.concurrent._
 import scala.concurrent.duration._
@@ -1237,7 +1233,8 @@ object JobManager {
     LOG.info("Starting JobManager")
 
     // Bring up the job manager actor system first, bind it to the given address.
-    LOG.info(s"Starting JobManager actor system at $listeningAddress:$listeningPort.")
+    val hostPortUrl = NetUtils.hostAndPortToUrlString(listeningAddress, listeningPort)
+    LOG.info(s"Starting JobManager actor system at $hostPortUrl")
 
     val jobManagerSystem = try {
       val akkaConfig = AkkaUtils.getAkkaConfig(
@@ -1451,8 +1448,9 @@ object JobManager {
 
     val executionMode = config.getJobManagerMode
     val streamingMode = config.getStreamingMode
-
-    LOG.info(s"Starting JobManager on $host:$port with execution mode $executionMode and
" +
+    val hostPortUrl = NetUtils.hostAndPortToUrlString(host, port)
+    
+    LOG.info(s"Starting JobManager on $hostPortUrl with execution mode $executionMode and
" +
       s"streaming mode $streamingMode")
 
     (configuration, executionMode, streamingMode, host, port)
@@ -1657,7 +1655,7 @@ object JobManager {
       address: InetSocketAddress,
       name: Option[String] = None)
     : String = {
-    val hostPort = address.getAddress().getHostAddress() + ":" + address.getPort()
+    val hostPort = NetUtils.socketAddressToUrlString(address)
 
     getJobManagerAkkaURLHelper(s"akka.tcp://flink@$hostPort", name)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 839193b..b3cff51 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -76,7 +76,9 @@ abstract class FlinkMiniCluster(
 
   // NOTE: THIS MUST BE getByName("localhost"), which is 127.0.0.1 and
   // not getLocalHost(), which may be 127.0.1.1
-  val hostname = InetAddress.getByName("localhost").getHostAddress()
+  val hostname = userConfiguration.getString(
+    ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
+    InetAddress.getByName("localhost").getHostAddress())
 
   val configuration = generateConfiguration(userConfiguration)
 
@@ -243,7 +245,7 @@ abstract class FlinkMiniCluster(
     jobManagerActorSystems = Some(jmActorSystems)
     jobManagerActors = Some(jmActors)
 
-    val lrs = createLeaderRetrievalService();
+    val lrs = createLeaderRetrievalService()
 
     leaderRetrievalService = Some(lrs)
     lrs.start(this)

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 7a1bec5..bf23021 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1425,8 +1425,9 @@ object TaskManager {
     LOG.info(s"Starting TaskManager in streaming mode $streamingMode")
 
     // Bring up the TaskManager actor system first, bind it to the given address.
-
-    LOG.info(s"Starting TaskManager actor system at $taskManagerHostname:$actorSystemPort")
+    
+    LOG.info("Starting TaskManager actor system at " + 
+      NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort))
 
     val taskManagerSystem = try {
       val akkaConfig = AkkaUtils.getAkkaConfig(
@@ -1443,7 +1444,7 @@ object TaskManager {
         if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) {
           val cause = t.getCause()
           if (cause != null && t.getCause().isInstanceOf[java.net.BindException])
{
-            val address = taskManagerHostname + ":" + actorSystemPort
+            val address = NetUtils.hostAndPortToUrlString(taskManagerHostname, actorSystemPort)
             throw new IOException("Unable to bind TaskManager actor system to address " +
               address + " - " + cause.getMessage(), t)
           }

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index 1334bcc..ed03ae7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -40,6 +40,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringWriter;
+import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -78,10 +79,11 @@ public class TaskManagerProcessReapingTest {
 			tempLogFile.deleteOnExit();
 			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
 
+			final InetAddress localhost = InetAddress.getByName("localhost");
 			final int jobManagerPort = NetUtils.getAvailablePort();
 
 			// start a JobManager
-			Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost",
jobManagerPort);
+			Tuple2<String, Object> localAddress = new Tuple2<String, Object>(localhost.getHostAddress(),
jobManagerPort);
 			jmActorSystem = AkkaUtils.createActorSystem(
 					new Configuration(), new Some<Tuple2<String, Object>>(localAddress));
 
@@ -109,8 +111,9 @@ public class TaskManagerProcessReapingTest {
 
 			// grab the reference to the TaskManager. try multiple times, until the process
 			// is started and the TaskManager is up
-			String taskManagerActorName = String.format("akka.tcp://flink@%s:%d/user/%s",
-					"127.0.0.1", taskManagerPort, TaskManager.TASK_MANAGER_NAME());
+			String taskManagerActorName = String.format("akka.tcp://flink@%s/user/%s",
+					org.apache.flink.util.NetUtils.ipAddressAndPortToUrlString(localhost, taskManagerPort),
+					TaskManager.TASK_MANAGER_NAME());
 
 			ActorRef taskManagerRef = null;
 			Throwable lastError = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index 59477db..4e08857 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.akka
 
-import java.net.{InetAddress, InetSocketAddress}
+import java.net.InetSocketAddress
 
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.junit.runner.RunWith
@@ -64,4 +64,48 @@ class AkkaUtilsTest
     result should equal(expected)
   }
 
+  test("getHostFromAkkaURL should handle 'akka.tcp' as protocol") {
+    val url = "akka.tcp://flink@localhost:1234/user/jobmanager"
+    val expected = new InetSocketAddress("localhost", 1234)
+
+    val result = AkkaUtils.getInetSockeAddressFromAkkaURL(url)
+
+    result should equal(expected)
+  }
+
+  test("getHostFromAkkaURL should properly handle IPv4 addresses in URLs") {
+    val IPv4AddressString = "192.168.0.1"
+    val port = 1234
+    val address = new InetSocketAddress(IPv4AddressString, port)
+    
+    val url = s"akka://flink@$IPv4AddressString:$port/user/jobmanager"
+
+    val result = AkkaUtils.getInetSockeAddressFromAkkaURL(url)
+
+    result should equal(address)
+  }
+
+  test("getHostFromAkkaURL should properly handle IPv6 addresses in URLs") {
+    val IPv6AddressString = "2001:db8:10:11:12:ff00:42:8329"
+    val port = 1234
+    val address = new InetSocketAddress(IPv6AddressString, port)
+
+    val url = s"akka://flink@[$IPv6AddressString]:$port/user/jobmanager"
+
+    val result = AkkaUtils.getInetSockeAddressFromAkkaURL(url)
+
+    result should equal(address)
+  }
+
+  test("getHostFromAkkaURL should properly handle IPv6 addresses in 'akka.tcp' URLs") {
+    val IPv6AddressString = "2001:db8:10:11:12:ff00:42:8329"
+    val port = 1234
+    val address = new InetSocketAddress(IPv6AddressString, port)
+
+    val url = s"akka.tcp://flink@[$IPv6AddressString]:$port/user/jobmanager"
+
+    val result = AkkaUtils.getInetSockeAddressFromAkkaURL(url)
+
+    result should equal(address)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
index 9c70ed2..7d127ff 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
+import org.apache.flink.util.NetUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -69,7 +70,6 @@ public class RollingSinkFaultTolerance2ITCase extends StreamFaultToleranceTestBa
 
 	private static MiniDFSCluster hdfsCluster;
 	private static org.apache.hadoop.fs.FileSystem dfs;
-	private static String hdfsURI;
 
 	private static String outPath;
 
@@ -87,10 +87,9 @@ public class RollingSinkFaultTolerance2ITCase extends StreamFaultToleranceTestBa
 
 		dfs = hdfsCluster.getFileSystem();
 
-		hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort()
+"/";
-
-		outPath = hdfsURI + "/string-non-rolling-out-no-checkpoint";
-
+		outPath = "hdfs://"
+				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(),  hdfsCluster.getNameNodePort())
+				+ "/string-non-rolling-out-no-checkpoint";
 	}
 
 	@AfterClass

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
index e0592e9..65904d2 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
+import org.apache.flink.util.NetUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -64,7 +65,6 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas
 
 	private static MiniDFSCluster hdfsCluster;
 	private static org.apache.hadoop.fs.FileSystem dfs;
-	private static String hdfsURI;
 
 	private static String outPath;
 
@@ -82,10 +82,9 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas
 
 		dfs = hdfsCluster.getFileSystem();
 
-		hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort()
+"/";
-
-		outPath = hdfsURI + "/string-non-rolling-out";
-
+		outPath = "hdfs://"
+				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+				+ "/string-non-rolling-out";
 	}
 
 	@AfterClass

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 008b4b6..9770f41 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.NetUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -79,7 +80,9 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase
{
 
 		dfs = hdfsCluster.getFileSystem();
 
-		hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort()
+"/";
+		hdfsURI = "hdfs://"
+				+ NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort())
+				+ "/";
 	}
 
 	@AfterClass

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 5016e7e..e9a5728 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -69,6 +69,7 @@ import org.apache.flink.testutils.junit.RetryOnException;
 import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.Collector;
 
+import org.apache.flink.util.NetUtils;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.junit.Assert;
 
@@ -841,7 +842,11 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		while (firstPart.errorCode() != 0);
 		zkClient.close();
 
-		final String leaderToShutDown = firstPart.leader().get().connectionString();
+		final kafka.cluster.Broker leaderToShutDown = firstPart.leader().get();
+		final String leaderToShutDownConnection = 
+				NetUtils.hostAndPortToUrlString(leaderToShutDown.host(), leaderToShutDown.port());
+		
+		
 		final int leaderIdToShutDown = firstPart.leader().get().id();
 		LOG.info("Leader to shutdown {}", leaderToShutDown);
 
@@ -863,7 +868,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
 		env
 				.addSource(kafkaSource)
 				.map(new PartitionValidatingMapper(parallelism, 1))
-				.map(new BrokerKillingMapper<Integer>(leaderToShutDown, failAfterElements))
+				.map(new BrokerKillingMapper<Integer>(leaderToShutDownConnection, failAfterElements))
 				.addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1);
 
 		BrokerKillingMapper.killedLeaderBefore = false;
@@ -1068,14 +1073,28 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase
{
 					// shut down a Kafka broker
 					KafkaServer toShutDown = null;
 					for (KafkaServer kafkaServer : brokers) {
-						if (leaderToShutDown.equals(kafkaServer.config().advertisedHostName()+ ":"+ kafkaServer.config().advertisedPort()))
{
+						String connectionUrl = 
+								NetUtils.hostAndPortToUrlString(
+										kafkaServer.config().advertisedHostName(),
+										kafkaServer.config().advertisedPort());
+						if (leaderToShutDown.equals(connectionUrl)) {
 							toShutDown = kafkaServer;
 							break;
 						}
 					}
 	
 					if (toShutDown == null) {
-						throw new Exception("Cannot find broker to shut down");
+						StringBuilder listOfBrokers = new StringBuilder();
+						for (KafkaServer kafkaServer : brokers) {
+							listOfBrokers.append(
+									NetUtils.hostAndPortToUrlString(
+											kafkaServer.config().advertisedHostName(),
+											kafkaServer.config().advertisedPort()));
+							listOfBrokers.append(" ; ");
+						}
+						
+						throw new Exception("Cannot find broker to shut down: " + leaderToShutDown
+								+ " ; available brokers: " + listOfBrokers.toString());
 					}
 					else {
 						hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 61f384a..d511796 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -59,6 +59,7 @@ import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.util.NetUtils.hostAndPortToUrlString;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -148,7 +149,7 @@ public abstract class KafkaTestBase extends TestLogger {
 				SocketServer socketServer = brokers.get(i).socketServer();
 				
 				String host = socketServer.host() == null ? "localhost" : socketServer.host();
-				brokerConnectionStrings += host+":"+socketServer.port()+",";
+				brokerConnectionStrings += hostAndPortToUrlString(host, socketServer.port()) + ",";
 			}
 
 			LOG.info("ZK and KafkaServer started.");

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index c8b0e0c..5c5a465 100644
--- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -96,7 +96,7 @@ class ForkableFlinkMiniCluster(
       ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
       ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
 
-    if(jobManagerPort > 0) {
+    if (jobManagerPort > 0) {
       config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort + index)
     }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bfde1b73/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
new file mode 100644
index 0000000..af51ed6
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import scala.Some;
+
+import java.io.IOException;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.net.ServerSocket;
+import java.util.Enumeration;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class IPv6HostnamesITCase extends TestLogger {
+	
+	@Test
+	public void testClusterWithIPv6host() {
+
+		final Inet6Address ipv6address = getLocalIPv6Address();
+		if (ipv6address == null) {
+			System.err.println("--- Cannot find a non-loopback local IPv6 address, skipping IPv6HostnamesITCase");
+			return;
+		}
+
+		
+		
+		ForkableFlinkMiniCluster flink = null;
+		try {
+			final String addressString = ipv6address.getHostAddress();
+			log.info("Test will use IPv6 address " + addressString + " for connection tests");
+			
+			Configuration conf = new Configuration();
+			conf.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, addressString);
+			conf.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, addressString);
+			conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+			conf.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+			conf.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
+			
+			flink = new ForkableFlinkMiniCluster(conf, false, StreamingMode.BATCH_ONLY);
+			flink.start();
+
+			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(addressString,
flink.getLeaderRPCPort());
+
+			// get input data
+			DataSet<String> text = env.fromElements(WordCountData.TEXT.split("\n"));
+
+			DataSet<Tuple2<String, Integer>> counts =text
+					.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
+						@Override
+						public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
throws Exception {
+							for (String token : value.toLowerCase().split("\\W+")) {
+								if (token.length() > 0) {
+									out.collect(new Tuple2<String, Integer>(token, 1));
+								}
+							}
+						}
+					})
+					.groupBy(0).sum(1);
+
+			List<Tuple2<String, Integer>> result = counts.collect();
+
+			TestBaseUtils.compareResultAsText(result, WordCountData.COUNTS_AS_TUPLES);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (flink != null) {
+				flink.shutdown();
+			}
+		}
+	}
+	
+	
+	private Inet6Address getLocalIPv6Address() {
+		try {
+			Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
+			while (e.hasMoreElements()) {
+				NetworkInterface netInterface = e.nextElement();
+
+				// for each address of the network interface
+				Enumeration<InetAddress> ee = netInterface.getInetAddresses();
+				while (ee.hasMoreElements()) {
+					InetAddress addr = ee.nextElement();
+					
+					
+					if (addr instanceof Inet6Address && (!addr.isLoopbackAddress()) && (!addr.isAnyLocalAddress()))
{
+						// see if it is possible to bind to the address
+						InetSocketAddress socketAddress = new InetSocketAddress(addr, 0);
+						
+						try {
+							log.info("Considering address " + addr);
+							
+							// test whether we can bind a socket to that address
+							log.info("Testing whether sockets can bind to " + addr);
+							ServerSocket sock = new ServerSocket();
+							sock.bind(socketAddress);
+							sock.close();
+
+							// test whether Akka's netty can bind to the address
+							log.info("Testing whether Akka can use " + addr);
+							int port = NetUtils.getAvailablePort();
+							ActorSystem as = AkkaUtils.createActorSystem(
+									new Configuration(),
+									new Some<scala.Tuple2<String, Object>>(new scala.Tuple2<String, Object>(addr.getHostAddress(),
port)));
+							as.shutdown();
+
+							log.info("Using address " + addr);
+							return (Inet6Address) addr;
+						}
+						catch (IOException ignored) {
+							// fall through the loop
+						}
+					}
+				}
+			}
+			
+			return null;
+		}
+		catch (Exception e) {
+			return null;
+		}
+	}
+}


Mime
View raw message