flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/8] flink git commit: [hotfix] [core] Migrate ConnectionUtils from System.currentTimeMillis() to System.nanoTime()
Date Wed, 11 Jan 2017 20:19:40 GMT
[hotfix] [core] Migrate ConnectionUtils from System.currentTimeMillis() to System.nanoTime()

This change makes the code robust against concurrent clock adjustments.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c789bf9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c789bf9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c789bf9

Branch: refs/heads/master
Commit: 3c789bf937444daf10f1da42d857597dc9814d8a
Parents: 986a03f
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Jan 6 14:23:17 2017 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Jan 11 19:43:47 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/net/ConnectionUtils.java      | 34 +++++++++++---------
 .../flink/runtime/net/ConnectionUtilsTest.java  | 13 +++-----
 2 files changed, 23 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3c789bf9/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
index dcf5a62..75a7ebe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/ConnectionUtils.java
@@ -45,6 +45,9 @@ import scala.concurrent.duration.FiniteDuration;
 /**
  * Utilities to determine the network interface and address that should be used to bind the
  * TaskManager communication to.
+ * 
+ * <p>Implementation note: This class uses {@code System.nanoTime()} to measure elapsed
time, because
+ * that is not susceptible to clock changes.
  */
 public class ConnectionUtils {
 	
@@ -110,10 +113,10 @@ public class ConnectionUtils {
 			throw new IllegalArgumentException("Max wait time must be positive");
 		}
 
-		final long startTime = System.currentTimeMillis();
+		final long startTimeNanos = System.nanoTime();
 
 		long currentSleepTime = MIN_SLEEP_TIME;
-		long elapsedTime = 0;
+		long elapsedTimeMillis = 0;
 
 		final List<AddressDetectionState> strategies = Collections.unmodifiableList(
 			Arrays.asList(
@@ -123,8 +126,8 @@ public class ConnectionUtils {
 				AddressDetectionState.SLOW_CONNECT));
 
 		// loop while there is time left
-		while (elapsedTime < maxWaitMillis) {
-			boolean logging = elapsedTime >= startLoggingAfter;
+		while (elapsedTimeMillis < maxWaitMillis) {
+			boolean logging = elapsedTimeMillis >= startLoggingAfter;
 			if (logging) {
 				LOG.info("Trying to connect to " + targetAddress);
 			}
@@ -139,9 +142,9 @@ public class ConnectionUtils {
 
 			// we have made a pass with all strategies over all interfaces
 			// sleep for a while before we make the next pass
-			elapsedTime = System.currentTimeMillis() - startTime;
+			elapsedTimeMillis = (System.nanoTime() - startTimeNanos) / 1_000_000;
 
-			long toWait = Math.min(maxWaitMillis - elapsedTime, currentSleepTime);
+			long toWait = Math.min(maxWaitMillis - elapsedTimeMillis, currentSleepTime);
 			if (toWait > 0) {
 				if (logging) {
 					LOG.info("Could not connect. Waiting for {} msecs before next attempt", toWait);
@@ -363,15 +366,16 @@ public class ConnectionUtils {
 				FiniteDuration timeout,
 				FiniteDuration startLoggingAfter)
 			throws LeaderRetrievalException {
-			long startTime = System.currentTimeMillis();
+
+			final long startTimeNanos = System.nanoTime();
 			long currentSleepTime = MIN_SLEEP_TIME;
-			long elapsedTime = 0;
+			long elapsedTimeMillis = 0;
 			InetSocketAddress targetAddress = null;
 
 			try {
-				while (elapsedTime < timeout.toMillis()) {
+				while (elapsedTimeMillis < timeout.toMillis()) {
 
-					long maxTimeout = timeout.toMillis() - elapsedTime;
+					long maxTimeout = timeout.toMillis() - elapsedTimeMillis;
 
 					synchronized (retrievalLock) {
 						if (exception != null) {
@@ -401,7 +405,7 @@ public class ConnectionUtils {
 					if (targetAddress != null) {
 						AddressDetectionState strategy = AddressDetectionState.LOCAL_HOST;
 
-						boolean logging = elapsedTime >= startLoggingAfter.toMillis();
+						boolean logging = elapsedTimeMillis >= startLoggingAfter.toMillis();
 						if (logging) {
 							LOG.info("Trying to connect to address {}", targetAddress);
 						}
@@ -433,10 +437,10 @@ public class ConnectionUtils {
 						while (strategy != null);
 					}
 
-					elapsedTime = System.currentTimeMillis() - startTime;
+					elapsedTimeMillis = (System.nanoTime() - startTimeNanos) / 1_000_000;
 
 					long timeToWait = Math.min(
-							Math.max(timeout.toMillis() - elapsedTime, 0),
+							Math.max(timeout.toMillis() - elapsedTimeMillis, 0),
 							currentSleepTime);
 
 					if (timeToWait > 0) {
@@ -448,7 +452,7 @@ public class ConnectionUtils {
 							}
 						}
 
-						elapsedTime = System.currentTimeMillis() - startTime;
+						elapsedTimeMillis = (System.nanoTime() - startTimeNanos) / 1_000_000;
 					}
 				}
 
@@ -473,7 +477,7 @@ public class ConnectionUtils {
 
 		@Override
 		public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
-			if (leaderAddress != null && !leaderAddress.equals("")) {
+			if (leaderAddress != null && !leaderAddress.isEmpty()) {
 				synchronized (retrievalLock) {
 					akkaURL = leaderAddress;
 					retrievalState = LeaderRetrievalState.NEWLY_RETRIEVED;

http://git-wip-us.apache.org/repos/asf/flink/blob/3c789bf9/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
index 13a8214..b5c2819 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/net/ConnectionUtilsTest.java
@@ -34,7 +34,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * Tests for the network utilities.
@@ -44,18 +43,18 @@ import static org.junit.Assert.fail;
 public class ConnectionUtilsTest {
 
 	@Test
-	public void testReturnLocalHostAddressUsingHeuristics() {
+	public void testReturnLocalHostAddressUsingHeuristics() throws Exception {
 		try (ServerSocket blocker = new ServerSocket(0, 1, InetAddress.getLocalHost())) {
 			// the "blocker" server socket simply does not accept connections
 			// this address is consequently "unreachable"
 			InetSocketAddress unreachable = new InetSocketAddress("localhost", blocker.getLocalPort());
 			
-			final long start = System.currentTimeMillis();
+			final long start = System.nanoTime();
 			InetAddress add = ConnectionUtils.findConnectingAddress(unreachable, 2000, 400);
 
-			// check that it did not take forever
+			// check that it did not take forever (max 30 seconds)
 			// this check can unfortunately not be too tight, or it will be flaky on some CI infrastructure
-			assertTrue(System.currentTimeMillis() - start < 30000);
+			assertTrue(System.nanoTime() - start < 30_000_000_000L);
 
 			// we should have found a heuristic address
 			assertNotNull(add);
@@ -63,10 +62,6 @@ public class ConnectionUtilsTest {
 			// make sure that we returned the InetAddress.getLocalHost as a heuristic
 			assertEquals(InetAddress.getLocalHost(), add);
 		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
 	}
 
 	@Test


Mime
View raw message