cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aweisb...@apache.org
Subject [01/19] cassandra git commit: Allow storage port to be configurable per node
Date Thu, 25 Jan 2018 20:11:58 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 4de7a65ed -> 59b5b6bef


http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
index 3ef7bbb..3884f5a 100644
--- a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
+++ b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java
@@ -23,7 +23,6 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.net.InetAddress;
 import java.util.*;
 
 import com.google.common.collect.HashMultimap;
@@ -35,6 +34,7 @@ import org.junit.runner.RunWith;
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Schema;
@@ -192,12 +192,12 @@ public class StorageServiceServerTest
         metadata.clearUnsafe();
 
         // DC1
-        metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
-        metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2"));
+        metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.2"));
 
         // DC2
-        metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
-        metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
+        metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.4"));
+        metadata.updateNormalToken(new StringToken("D"), InetAddressAndPort.getByName("127.0.0.5"));
 
         Map<String, String> configOptions = new HashMap<>();
         configOptions.put("DC1", "1");
@@ -209,22 +209,22 @@ public class StorageServiceServerTest
         Schema.instance.load(meta);
 
         Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name,
-                                                                                                            InetAddress.getByName("127.0.0.1"));
+                                                                                                            InetAddressAndPort.getByName("127.0.0.1"));
         assertEquals(2, primaryRanges.size());
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A"))));
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
 
-        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.2"));
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.2"));
         assertEquals(2, primaryRanges.size());
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
 
-        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.4"));
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.4"));
         assertEquals(2, primaryRanges.size());
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A"))));
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
 
-        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.5"));
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.5"));
         assertEquals(2, primaryRanges.size());
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
@@ -236,11 +236,11 @@ public class StorageServiceServerTest
         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
         metadata.clearUnsafe();
         // DC1
-        metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
-        metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2"));
+        metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.2"));
         // DC2
-        metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
-        metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
+        metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.4"));
+        metadata.updateNormalToken(new StringToken("D"), InetAddressAndPort.getByName("127.0.0.5"));
 
         Map<String, String> configOptions = new HashMap<>();
         configOptions.put("DC1", "1");
@@ -251,19 +251,19 @@ public class StorageServiceServerTest
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, configOptions));
         Schema.instance.load(meta);
 
-        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.1"));
+        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.1"));
         assert primaryRanges.size() == 1;
         assert primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A")));
 
-        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.2"));
+        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.2"));
         assert primaryRanges.size() == 1;
         assert primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C")));
 
-        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.4"));
+        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.4"));
         assert primaryRanges.size() == 1;
         assert primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B")));
 
-        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.5"));
+        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.5"));
         assert primaryRanges.size() == 1;
         assert primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D")));
     }
@@ -274,11 +274,11 @@ public class StorageServiceServerTest
         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
         metadata.clearUnsafe();
         // DC1
-        metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
-        metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2"));
+        metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.2"));
         // DC2
-        metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
-        metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
+        metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.4"));
+        metadata.updateNormalToken(new StringToken("D"), InetAddressAndPort.getByName("127.0.0.5"));
 
         Map<String, String> configOptions = new HashMap<>();
         configOptions.put("DC2", "2");
@@ -289,19 +289,19 @@ public class StorageServiceServerTest
         Schema.instance.load(meta);
 
         // endpoints in DC1 should not have primary range
-        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.1"));
+        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.1"));
         assert primaryRanges.isEmpty();
 
-        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.2"));
+        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.2"));
         assert primaryRanges.isEmpty();
 
         // endpoints in DC2 should have primary ranges which also cover DC1
-        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.4"));
+        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.4"));
         assert primaryRanges.size() == 2;
         assert primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A")));
         assert primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B")));
 
-        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.5"));
+        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.5"));
         assert primaryRanges.size() == 2;
         assert primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D")));
         assert primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C")));
@@ -313,11 +313,11 @@ public class StorageServiceServerTest
         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
         metadata.clearUnsafe();
         // DC1
-        metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
-        metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2"));
+        metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.2"));
         // DC2
-        metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
-        metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
+        metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.4"));
+        metadata.updateNormalToken(new StringToken("D"), InetAddressAndPort.getByName("127.0.0.5"));
 
         Map<String, String> configOptions = new HashMap<>();
         configOptions.put("DC2", "2");
@@ -328,20 +328,20 @@ public class StorageServiceServerTest
         Schema.instance.load(meta);
 
         // endpoints in DC1 should not have primary range
-        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.1"));
+        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.1"));
         assertTrue(primaryRanges.isEmpty());
 
         primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name,
-                                                                                   InetAddress.getByName("127.0.0.2"));
+                                                                                   InetAddressAndPort.getByName("127.0.0.2"));
         assertTrue(primaryRanges.isEmpty());
 
         // endpoints in DC2 should have primary ranges which also cover DC1
-        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.4"));
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.4"));
         assertTrue(primaryRanges.size() == 2);
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("D"), new StringToken("A"))));
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
 
-        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.5"));
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.5"));
         assertTrue(primaryRanges.size() == 2);
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
@@ -353,22 +353,22 @@ public class StorageServiceServerTest
         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
         metadata.clearUnsafe();
         // DC1
-        Multimap<InetAddress, Token> dc1 = HashMultimap.create();
-        dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("A"));
-        dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("E"));
-        dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("H"));
-        dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("C"));
-        dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("I"));
-        dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("J"));
+        Multimap<InetAddressAndPort, Token> dc1 = HashMultimap.create();
+        dc1.put(InetAddressAndPort.getByName("127.0.0.1"), new StringToken("A"));
+        dc1.put(InetAddressAndPort.getByName("127.0.0.1"), new StringToken("E"));
+        dc1.put(InetAddressAndPort.getByName("127.0.0.1"), new StringToken("H"));
+        dc1.put(InetAddressAndPort.getByName("127.0.0.2"), new StringToken("C"));
+        dc1.put(InetAddressAndPort.getByName("127.0.0.2"), new StringToken("I"));
+        dc1.put(InetAddressAndPort.getByName("127.0.0.2"), new StringToken("J"));
         metadata.updateNormalTokens(dc1);
         // DC2
-        Multimap<InetAddress, Token> dc2 = HashMultimap.create();
-        dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("B"));
-        dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("G"));
-        dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("L"));
-        dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("D"));
-        dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("F"));
-        dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("K"));
+        Multimap<InetAddressAndPort, Token> dc2 = HashMultimap.create();
+        dc2.put(InetAddressAndPort.getByName("127.0.0.4"), new StringToken("B"));
+        dc2.put(InetAddressAndPort.getByName("127.0.0.4"), new StringToken("G"));
+        dc2.put(InetAddressAndPort.getByName("127.0.0.4"), new StringToken("L"));
+        dc2.put(InetAddressAndPort.getByName("127.0.0.5"), new StringToken("D"));
+        dc2.put(InetAddressAndPort.getByName("127.0.0.5"), new StringToken("F"));
+        dc2.put(InetAddressAndPort.getByName("127.0.0.5"), new StringToken("K"));
         metadata.updateNormalTokens(dc2);
 
         Map<String, String> configOptions = new HashMap<>();
@@ -380,14 +380,14 @@ public class StorageServiceServerTest
         Schema.instance.load(meta);
 
         // endpoints in DC1 should not have primary range
-        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.1"));
+        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.1"));
         assert primaryRanges.isEmpty();
 
-        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.2"));
+        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.2"));
         assert primaryRanges.isEmpty();
 
         // endpoints in DC2 should have primary ranges which also cover DC1
-        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.4"));
+        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.4"));
         assert primaryRanges.size() == 4;
         assert primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B")));
         assert primaryRanges.contains(new Range<Token>(new StringToken("F"), new StringToken("G")));
@@ -396,7 +396,7 @@ public class StorageServiceServerTest
         // the node covers range (L, A]
         assert primaryRanges.contains(new Range<Token>(new StringToken("L"), new StringToken("A")));
 
-        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.5"));
+        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.5"));
         assert primaryRanges.size() == 8;
         assert primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D")));
         assert primaryRanges.contains(new Range<Token>(new StringToken("E"), new StringToken("F")));
@@ -418,23 +418,23 @@ public class StorageServiceServerTest
         metadata.clearUnsafe();
 
         // DC1
-        Multimap<InetAddress, Token> dc1 = HashMultimap.create();
-        dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("A"));
-        dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("E"));
-        dc1.put(InetAddress.getByName("127.0.0.1"), new StringToken("H"));
-        dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("C"));
-        dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("I"));
-        dc1.put(InetAddress.getByName("127.0.0.2"), new StringToken("J"));
+        Multimap<InetAddressAndPort, Token> dc1 = HashMultimap.create();
+        dc1.put(InetAddressAndPort.getByName("127.0.0.1"), new StringToken("A"));
+        dc1.put(InetAddressAndPort.getByName("127.0.0.1"), new StringToken("E"));
+        dc1.put(InetAddressAndPort.getByName("127.0.0.1"), new StringToken("H"));
+        dc1.put(InetAddressAndPort.getByName("127.0.0.2"), new StringToken("C"));
+        dc1.put(InetAddressAndPort.getByName("127.0.0.2"), new StringToken("I"));
+        dc1.put(InetAddressAndPort.getByName("127.0.0.2"), new StringToken("J"));
         metadata.updateNormalTokens(dc1);
 
         // DC2
-        Multimap<InetAddress, Token> dc2 = HashMultimap.create();
-        dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("B"));
-        dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("G"));
-        dc2.put(InetAddress.getByName("127.0.0.4"), new StringToken("L"));
-        dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("D"));
-        dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("F"));
-        dc2.put(InetAddress.getByName("127.0.0.5"), new StringToken("K"));
+        Multimap<InetAddressAndPort, Token> dc2 = HashMultimap.create();
+        dc2.put(InetAddressAndPort.getByName("127.0.0.4"), new StringToken("B"));
+        dc2.put(InetAddressAndPort.getByName("127.0.0.4"), new StringToken("G"));
+        dc2.put(InetAddressAndPort.getByName("127.0.0.4"), new StringToken("L"));
+        dc2.put(InetAddressAndPort.getByName("127.0.0.5"), new StringToken("D"));
+        dc2.put(InetAddressAndPort.getByName("127.0.0.5"), new StringToken("F"));
+        dc2.put(InetAddressAndPort.getByName("127.0.0.5"), new StringToken("K"));
         metadata.updateNormalTokens(dc2);
 
         Map<String, String> configOptions = new HashMap<>();
@@ -447,7 +447,7 @@ public class StorageServiceServerTest
         Schema.instance.load(meta);
 
         // endpoints in DC1 should have primary ranges which also cover DC2
-        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.1"));
+        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.1"));
         assertEquals(8, primaryRanges.size());
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("J"), new StringToken("K"))));
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("K"), new StringToken("L"))));
@@ -459,7 +459,7 @@ public class StorageServiceServerTest
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("G"), new StringToken("H"))));
 
         // endpoints in DC1 should have primary ranges which also cover DC2
-        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.2"));
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.2"));
         assertEquals(4, primaryRanges.size());
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C"))));
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
@@ -467,7 +467,7 @@ public class StorageServiceServerTest
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("I"), new StringToken("J"))));
 
         // endpoints in DC2 should have primary ranges which also cover DC1
-        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.4"));
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.4"));
         assertEquals(4, primaryRanges.size());
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B"))));
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("F"), new StringToken("G"))));
@@ -476,7 +476,7 @@ public class StorageServiceServerTest
         // the node covers range (L, A]
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("L"), new StringToken("A"))));
 
-        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.5"));
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.5"));
         assertTrue(primaryRanges.size() == 8);
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("D"))));
         assertTrue(primaryRanges.contains(new Range<Token>(new StringToken("E"), new StringToken("F"))));
@@ -497,23 +497,23 @@ public class StorageServiceServerTest
         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
         metadata.clearUnsafe();
 
-        metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
-        metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.2"));
-        metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.3"));
+        metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.2"));
+        metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.3"));
 
         Keyspace.clear("Keyspace1");
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.simpleTransient(2));
         Schema.instance.load(meta);
 
-        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.1"));
+        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.1"));
         assert primaryRanges.size() == 1;
         assert primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("A")));
 
-        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.2"));
+        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.2"));
         assert primaryRanges.size() == 1;
         assert primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B")));
 
-        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddress.getByName("127.0.0.3"));
+        primaryRanges = StorageService.instance.getPrimaryRangesForEndpoint(meta.name, InetAddressAndPort.getByName("127.0.0.3"));
         assert primaryRanges.size() == 1;
         assert primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C")));
     }
@@ -525,9 +525,9 @@ public class StorageServiceServerTest
         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
         metadata.clearUnsafe();
 
-        metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
-        metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.2"));
-        metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.3"));
+        metadata.updateNormalToken(new StringToken("A"), InetAddressAndPort.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new StringToken("B"), InetAddressAndPort.getByName("127.0.0.2"));
+        metadata.updateNormalToken(new StringToken("C"), InetAddressAndPort.getByName("127.0.0.3"));
 
         Map<String, String> configOptions = new HashMap<>();
         configOptions.put("replication_factor", "2");
@@ -536,15 +536,15 @@ public class StorageServiceServerTest
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.simpleTransient(2));
         Schema.instance.load(meta);
 
-        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.1"));
+        Collection<Range<Token>> primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.1"));
         assert primaryRanges.size() == 1;
         assert primaryRanges.contains(new Range<Token>(new StringToken("C"), new StringToken("A")));
 
-        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.2"));
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.2"));
         assert primaryRanges.size() == 1;
         assert primaryRanges.contains(new Range<Token>(new StringToken("A"), new StringToken("B")));
 
-        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddress.getByName("127.0.0.3"));
+        primaryRanges = StorageService.instance.getPrimaryRangeForEndpointWithinDC(meta.name, InetAddressAndPort.getByName("127.0.0.3"));
         assert primaryRanges.size() == 1;
         assert primaryRanges.contains(new Range<Token>(new StringToken("B"), new StringToken("C")));
     }
@@ -557,10 +557,10 @@ public class StorageServiceServerTest
         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
         metadata.clearUnsafe();
 
-        metadata.updateNormalToken(new LongToken(1000L), InetAddress.getByName("127.0.0.1"));
-        metadata.updateNormalToken(new LongToken(2000L), InetAddress.getByName("127.0.0.2"));
-        metadata.updateNormalToken(new LongToken(3000L), InetAddress.getByName("127.0.0.3"));
-        metadata.updateNormalToken(new LongToken(4000L), InetAddress.getByName("127.0.0.4"));
+        metadata.updateNormalToken(new LongToken(1000L), InetAddressAndPort.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new LongToken(2000L), InetAddressAndPort.getByName("127.0.0.2"));
+        metadata.updateNormalToken(new LongToken(3000L), InetAddressAndPort.getByName("127.0.0.3"));
+        metadata.updateNormalToken(new LongToken(4000L), InetAddressAndPort.getByName("127.0.0.4"));
 
         Collection<Range<Token>> repairRangeFrom = StorageService.instance.createRepairRangeFrom("1500", "3700");
         assert repairRangeFrom.size() == 3;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
index 8172463..f8567e8 100644
--- a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
+++ b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.schema.KeyspaceParams;
@@ -48,7 +49,7 @@ public class WriteResponseHandlerTest
 {
     static Keyspace ks;
     static ColumnFamilyStore cfs;
-    static List<InetAddress> targets;
+    static List<InetAddressAndPort> targets;
 
     @BeforeClass
     public static void setUpClass() throws Throwable
@@ -57,36 +58,36 @@ public class WriteResponseHandlerTest
         // Register peers with expected DC for NetworkTopologyStrategy.
         TokenMetadata metadata = StorageService.instance.getTokenMetadata();
         metadata.clearUnsafe();
-        metadata.updateHostId(UUID.randomUUID(), InetAddress.getByName("127.1.0.255"));
-        metadata.updateHostId(UUID.randomUUID(), InetAddress.getByName("127.2.0.255"));
+        metadata.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.1.0.255"));
+        metadata.updateHostId(UUID.randomUUID(), InetAddressAndPort.getByName("127.2.0.255"));
 
         DatabaseDescriptor.setEndpointSnitch(new IEndpointSnitch()
         {
-            public String getRack(InetAddress endpoint)
+            public String getRack(InetAddressAndPort endpoint)
             {
                 return null;
             }
 
-            public String getDatacenter(InetAddress endpoint)
+            public String getDatacenter(InetAddressAndPort endpoint)
             {
-                byte[] address = endpoint.getAddress();
+                byte[] address = endpoint.address.getAddress();
                 if (address[1] == 1)
                     return "datacenter1";
                 else
                     return "datacenter2";
             }
 
-            public List<InetAddress> getSortedListByProximity(InetAddress address, Collection<InetAddress> unsortedAddress)
+            public List<InetAddressAndPort> getSortedListByProximity(InetAddressAndPort address, Collection<InetAddressAndPort> unsortedAddress)
             {
                 return null;
             }
 
-            public void sortByProximity(InetAddress address, List<InetAddress> addresses)
+            public void sortByProximity(InetAddressAndPort address, List<InetAddressAndPort> addresses)
             {
 
             }
 
-            public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+            public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
             {
                 return 0;
             }
@@ -96,7 +97,7 @@ public class WriteResponseHandlerTest
 
             }
 
-            public boolean isWorthMergingForRangeQuery(List<InetAddress> merged, List<InetAddress> l1, List<InetAddress> l2)
+            public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2)
             {
                 return false;
             }
@@ -105,8 +106,8 @@ public class WriteResponseHandlerTest
         SchemaLoader.createKeyspace("Foo", KeyspaceParams.nts("datacenter1", 3, "datacenter2", 3), SchemaLoader.standardCFMD("Foo", "Bar"));
         ks = Keyspace.open("Foo");
         cfs = ks.getColumnFamilyStore("Bar");
-        targets = ImmutableList.of(InetAddress.getByName("127.1.0.255"), InetAddress.getByName("127.1.0.254"), InetAddress.getByName("127.1.0.253"),
-                                   InetAddress.getByName("127.2.0.255"), InetAddress.getByName("127.2.0.254"), InetAddress.getByName("127.2.0.253"));
+        targets = ImmutableList.of(InetAddressAndPort.getByName("127.1.0.255"), InetAddressAndPort.getByName("127.1.0.254"), InetAddressAndPort.getByName("127.1.0.253"),
+                                   InetAddressAndPort.getByName("127.2.0.255"), InetAddressAndPort.getByName("127.2.0.254"), InetAddressAndPort.getByName("127.2.0.253"));
     }
 
     @Before

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
index bc24979..4f0c494 100644
--- a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
+++ b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
@@ -17,13 +17,13 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 
 import org.junit.Test;
 
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -36,7 +36,7 @@ public class SessionInfoTest
     public void testTotals()
     {
         TableId tableId = TableId.generate();
-        InetAddress local = FBUtilities.getLocalAddress();
+        InetAddressAndPort local = FBUtilities.getLocalAddressAndPort();
 
         Collection<StreamSummary> summaries = new ArrayList<>();
         for (int i = 0; i < 10; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 5c29698..ceaaae0 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -41,6 +40,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
@@ -74,7 +74,7 @@ public class StreamTransferTaskTest
     @Test
     public void testScheduleTimeout() throws Exception
     {
-        InetAddress peer = FBUtilities.getBroadcastAddress();
+        InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
         StreamSession session = new StreamSession(peer, peer, (connectionId, protocolVersion) -> new EmbeddedChannel(), 0, true, UUID.randomUUID(), PreviewKind.ALL);
         ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
 
@@ -120,7 +120,7 @@ public class StreamTransferTaskTest
     @Test
     public void testFailSessionDuringTransferShouldNotReleaseReferences() throws Exception
     {
-        InetAddress peer = FBUtilities.getBroadcastAddress();
+        InetAddressAndPort peer = FBUtilities.getBroadcastAddressAndPort();
         StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, new DefaultConnectionFactory(), false, null, PreviewKind.NONE);
         StreamResultFuture future = StreamResultFuture.init(UUID.randomUUID(), StreamOperation.OTHER, Collections.<StreamEventHandler>emptyList(), streamCoordinator);
         StreamSession session = new StreamSession(peer, peer, null, 0, true, null, PreviewKind.NONE);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 7a51d0c..16c07a0 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.streaming;
 
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
@@ -35,6 +34,7 @@ import junit.framework.Assert;
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
@@ -69,7 +69,7 @@ public class StreamingTransferTest
         DatabaseDescriptor.daemonInitialization();
     }
 
-    public static final InetAddress LOCAL = FBUtilities.getBroadcastAddress();
+    public static final InetAddressAndPort LOCAL = FBUtilities.getBroadcastAddressAndPort();
     public static final String KEYSPACE1 = "StreamingTransferTest1";
     public static final String CF_STANDARD = "Standard1";
     public static final String CF_COUNTER = "Counter1";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
index a9849a3..617bae1 100644
--- a/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
+++ b/test/unit/org/apache/cassandra/streaming/async/NettyStreamingMessageSenderTest.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import com.google.common.net.InetAddresses;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -34,6 +35,7 @@ import org.junit.Test;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.async.TestScheduledFuture;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.StreamOperation;
@@ -43,7 +45,7 @@ import org.apache.cassandra.streaming.messages.CompleteMessage;
 
 public class NettyStreamingMessageSenderTest
 {
-    private static final InetSocketAddress REMOTE_ADDR = new InetSocketAddress("127.0.0.2", 0);
+    private static final InetAddressAndPort REMOTE_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0);
 
     private EmbeddedChannel channel;
     private StreamSession session;
@@ -62,8 +64,8 @@ public class NettyStreamingMessageSenderTest
         channel = new EmbeddedChannel();
         channel.attr(NettyStreamingMessageSender.TRANSFERRING_FILE_ATTR).set(Boolean.FALSE);
         UUID pendingRepair = UUID.randomUUID();
-        session = new StreamSession(REMOTE_ADDR.getAddress(), REMOTE_ADDR.getAddress(), (connectionId, protocolVersion) -> null, 0, true, pendingRepair, PreviewKind.ALL);
-        StreamResultFuture future = StreamResultFuture.initReceivingSide(0, UUID.randomUUID(), StreamOperation.REPAIR, REMOTE_ADDR.getAddress(), channel, true, pendingRepair, session.getPreviewKind());
+        session = new StreamSession(REMOTE_ADDR, REMOTE_ADDR, (connectionId, protocolVersion) -> null, 0, true, pendingRepair, PreviewKind.ALL);
+        StreamResultFuture future = StreamResultFuture.initReceivingSide(0, UUID.randomUUID(), StreamOperation.REPAIR, REMOTE_ADDR, channel, true, pendingRepair, session.getPreviewKind());
         session.init(future);
         sender = session.getMessageSender();
         sender.setControlMessageChannel(channel);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
index a674e6b..b4a736e 100644
--- a/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
+++ b/test/unit/org/apache/cassandra/streaming/async/StreamingInboundHandlerTest.java
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.UUID;
 
+import com.google.common.net.InetAddresses;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -35,6 +36,7 @@ import io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.streaming.PreviewKind;
@@ -52,7 +54,7 @@ import org.apache.cassandra.streaming.messages.StreamMessage;
 public class StreamingInboundHandlerTest
 {
     private static final int VERSION = StreamMessage.CURRENT_VERSION;
-    private static final InetSocketAddress REMOTE_ADDR = new InetSocketAddress("127.0.0.2", 0);
+    private static final InetAddressAndPort REMOTE_ADDR = InetAddressAndPort.getByAddressOverrideDefaults(InetAddresses.forString("127.0.0.2"), 0);
 
     private StreamingInboundHandler handler;
     private EmbeddedChannel channel;
@@ -123,7 +125,7 @@ public class StreamingInboundHandlerTest
     @Test
     public void StreamDeserializingTask_deriveSession_StreamInitMessage() throws InterruptedException, IOException
     {
-        StreamInitMessage msg = new StreamInitMessage(REMOTE_ADDR.getAddress(), 0, UUID.randomUUID(), StreamOperation.REPAIR, true, UUID.randomUUID(), PreviewKind.ALL);
+        StreamInitMessage msg = new StreamInitMessage(REMOTE_ADDR, 0, UUID.randomUUID(), StreamOperation.REPAIR, true, UUID.randomUUID(), PreviewKind.ALL);
         StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(sid -> createSession(sid), null, channel);
         StreamSession session = task.deriveSession(msg);
         Assert.assertNotNull(session);
@@ -145,7 +147,7 @@ public class StreamingInboundHandlerTest
     @Test (expected = IllegalStateException.class)
     public void StreamDeserializingTask_deriveSession_IFM_NoSession() throws InterruptedException, IOException
     {
-        FileMessageHeader header = new FileMessageHeader(TableId.generate(), REMOTE_ADDR.getAddress(), UUID.randomUUID(), 0, 0,
+        FileMessageHeader header = new FileMessageHeader(TableId.generate(), REMOTE_ADDR, UUID.randomUUID(), 0, 0,
                                                          BigFormat.latestVersion, SSTableFormat.Type.BIG, 0, new ArrayList<>(), null, 0, UUID.randomUUID(), 0 , null);
         IncomingFileMessage msg = new IncomingFileMessage(null, header);
         StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(sid -> StreamManager.instance.findSession(sid.from, sid.planId, sid.sessionIndex), null, channel);
@@ -156,9 +158,9 @@ public class StreamingInboundHandlerTest
     public void StreamDeserializingTask_deriveSession_IFM_HasSession() throws InterruptedException, IOException
     {
         UUID planId = UUID.randomUUID();
-        StreamResultFuture future = StreamResultFuture.initReceivingSide(0, planId, StreamOperation.REPAIR, REMOTE_ADDR.getAddress(), channel, true, UUID.randomUUID(), PreviewKind.ALL);
+        StreamResultFuture future = StreamResultFuture.initReceivingSide(0, planId, StreamOperation.REPAIR, REMOTE_ADDR, channel, true, UUID.randomUUID(), PreviewKind.ALL);
         StreamManager.instance.register(future);
-        FileMessageHeader header = new FileMessageHeader(TableId.generate(), REMOTE_ADDR.getAddress(), planId, 0, 0,
+        FileMessageHeader header = new FileMessageHeader(TableId.generate(), REMOTE_ADDR, planId, 0, 0,
                                                          BigFormat.latestVersion, SSTableFormat.Type.BIG, 0, new ArrayList<>(), null, 0, UUID.randomUUID(), 0 , null);
         IncomingFileMessage msg = new IncomingFileMessage(null, header);
         StreamingInboundHandler.StreamDeserializingTask task = handler.new StreamDeserializingTask(sid -> StreamManager.instance.findSession(sid.from, sid.planId, sid.sessionIndex), null, channel);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/tracing/TracingTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tracing/TracingTest.java b/test/unit/org/apache/cassandra/tracing/TracingTest.java
index f546496..61e08b0 100644
--- a/test/unit/org/apache/cassandra/tracing/TracingTest.java
+++ b/test/unit/org/apache/cassandra/tracing/TracingTest.java
@@ -31,6 +31,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.utils.progress.ProgressEvent;
 import org.apache.commons.lang3.StringUtils;
 
@@ -197,7 +198,7 @@ public final class TracingTest
             return super.newSession(sessionId, traceType, customPayload);
         }
 
-        protected TraceState newTraceState(InetAddress ia, UUID uuid, Tracing.TraceType tt)
+        protected TraceState newTraceState(InetAddressAndPort ia, UUID uuid, Tracing.TraceType tt)
         {
             return new TraceState(ia, uuid, tt)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java b/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java
index 2dcd2ac..3d47ff3 100644
--- a/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java
+++ b/test/unit/org/apache/cassandra/transport/ErrorMessageTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.transport;
 
-import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
@@ -33,26 +32,27 @@ import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.exceptions.WriteFailureException;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.transport.messages.ErrorMessage;
 
 import static org.junit.Assert.assertEquals;
 
 public class ErrorMessageTest
 {
-    private static Map<InetAddress, RequestFailureReason> failureReasonMap1;
-    private static Map<InetAddress, RequestFailureReason> failureReasonMap2;
+    private static Map<InetAddressAndPort, RequestFailureReason> failureReasonMap1;
+    private static Map<InetAddressAndPort, RequestFailureReason> failureReasonMap2;
 
     @BeforeClass
     public static void setUpFixtures() throws UnknownHostException
     {
         failureReasonMap1 = new HashMap<>();
-        failureReasonMap1.put(InetAddress.getByName("127.0.0.1"), RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
-        failureReasonMap1.put(InetAddress.getByName("127.0.0.2"), RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
-        failureReasonMap1.put(InetAddress.getByName("127.0.0.3"), RequestFailureReason.UNKNOWN);
+        failureReasonMap1.put(InetAddressAndPort.getByName("127.0.0.1"), RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
+        failureReasonMap1.put(InetAddressAndPort.getByName("127.0.0.2"), RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
+        failureReasonMap1.put(InetAddressAndPort.getByName("127.0.0.3"), RequestFailureReason.UNKNOWN);
 
         failureReasonMap2 = new HashMap<>();
-        failureReasonMap2.put(InetAddress.getByName("127.0.0.1"), RequestFailureReason.UNKNOWN);
-        failureReasonMap2.put(InetAddress.getByName("127.0.0.2"), RequestFailureReason.UNKNOWN);
+        failureReasonMap2.put(InetAddressAndPort.getByName("127.0.0.1"), RequestFailureReason.UNKNOWN);
+        failureReasonMap2.put(InetAddressAndPort.getByName("127.0.0.2"), RequestFailureReason.UNKNOWN);
     }
 
     @Test
@@ -102,11 +102,11 @@ public class ErrorMessageTest
     @Test
     public void testRequestFailureExceptionMakesCopy() throws UnknownHostException
     {
-        Map<InetAddress, RequestFailureReason> modifiableFailureReasons = new HashMap<>(failureReasonMap1);
+        Map<InetAddressAndPort, RequestFailureReason> modifiableFailureReasons = new HashMap<>(failureReasonMap1);
         ReadFailureException rfe = new ReadFailureException(ConsistencyLevel.ALL, 3, 3, false, modifiableFailureReasons);
         WriteFailureException wfe = new WriteFailureException(ConsistencyLevel.ALL, 3, 3, WriteType.SIMPLE, modifiableFailureReasons);
 
-        modifiableFailureReasons.put(InetAddress.getByName("127.0.0.4"), RequestFailureReason.UNKNOWN);
+        modifiableFailureReasons.put(InetAddressAndPort.getByName("127.0.0.4"), RequestFailureReason.UNKNOWN);
 
         assertEquals(failureReasonMap1, rfe.failureReasonByEndpoint);
         assertEquals(failureReasonMap1, wfe.failureReasonByEndpoint);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java b/test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java
index e287c08..6b95c67 100644
--- a/test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java
+++ b/test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java
@@ -62,7 +62,8 @@ public class ProtocolVersionTest
         Assert.assertNotNull(ProtocolVersion.CURRENT);
 
         Assert.assertFalse(ProtocolVersion.V4.isBeta());
-        Assert.assertTrue(ProtocolVersion.V5.isBeta());
+        Assert.assertFalse(ProtocolVersion.V5.isBeta());
+        Assert.assertTrue(ProtocolVersion.V6.isBeta());
     }
 
     @Test
@@ -72,24 +73,30 @@ public class ProtocolVersionTest
         Assert.assertTrue(ProtocolVersion.V2.isSmallerOrEqualTo(ProtocolVersion.V2));
         Assert.assertTrue(ProtocolVersion.V3.isSmallerOrEqualTo(ProtocolVersion.V3));
         Assert.assertTrue(ProtocolVersion.V4.isSmallerOrEqualTo(ProtocolVersion.V4));
+        Assert.assertTrue(ProtocolVersion.V5.isSmallerOrEqualTo(ProtocolVersion.V5));
 
         Assert.assertTrue(ProtocolVersion.V1.isGreaterOrEqualTo(ProtocolVersion.V1));
         Assert.assertTrue(ProtocolVersion.V2.isGreaterOrEqualTo(ProtocolVersion.V2));
         Assert.assertTrue(ProtocolVersion.V3.isGreaterOrEqualTo(ProtocolVersion.V3));
         Assert.assertTrue(ProtocolVersion.V4.isGreaterOrEqualTo(ProtocolVersion.V4));
+        Assert.assertTrue(ProtocolVersion.V5.isGreaterOrEqualTo(ProtocolVersion.V5));
 
         Assert.assertTrue(ProtocolVersion.V1.isSmallerThan(ProtocolVersion.V2));
         Assert.assertTrue(ProtocolVersion.V2.isSmallerThan(ProtocolVersion.V3));
         Assert.assertTrue(ProtocolVersion.V3.isSmallerThan(ProtocolVersion.V4));
+        Assert.assertTrue(ProtocolVersion.V4.isSmallerThan(ProtocolVersion.V5));
 
         Assert.assertFalse(ProtocolVersion.V1.isGreaterThan(ProtocolVersion.V2));
         Assert.assertFalse(ProtocolVersion.V2.isGreaterThan(ProtocolVersion.V3));
         Assert.assertFalse(ProtocolVersion.V3.isGreaterThan(ProtocolVersion.V4));
+        Assert.assertFalse(ProtocolVersion.V4.isGreaterThan(ProtocolVersion.V5));
 
+        Assert.assertTrue(ProtocolVersion.V5.isGreaterThan(ProtocolVersion.V4));
         Assert.assertTrue(ProtocolVersion.V4.isGreaterThan(ProtocolVersion.V3));
         Assert.assertTrue(ProtocolVersion.V3.isGreaterThan(ProtocolVersion.V2));
         Assert.assertTrue(ProtocolVersion.V2.isGreaterThan(ProtocolVersion.V1));
 
+        Assert.assertFalse(ProtocolVersion.V5.isSmallerThan(ProtocolVersion.V4));
         Assert.assertFalse(ProtocolVersion.V4.isSmallerThan(ProtocolVersion.V3));
         Assert.assertFalse(ProtocolVersion.V3.isSmallerThan(ProtocolVersion.V2));
         Assert.assertFalse(ProtocolVersion.V2.isSmallerThan(ProtocolVersion.V1));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/transport/SerDeserTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/SerDeserTest.java b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
index 2592ae7..d3b9282 100644
--- a/test/unit/org/apache/cassandra/transport/SerDeserTest.java
+++ b/test/unit/org/apache/cassandra/transport/SerDeserTest.java
@@ -42,12 +42,14 @@ import static org.junit.Assert.assertEquals;
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Serialization/deserialization tests for protocol objects and messages.
  */
 public class SerDeserTest
 {
+
     @BeforeClass
     public static void setupDD()
     {
@@ -113,12 +115,12 @@ public class SerDeserTest
     {
         List<Event> events = new ArrayList<>();
 
-        events.add(TopologyChange.newNode(FBUtilities.getBroadcastAddress(), 42));
-        events.add(TopologyChange.removedNode(FBUtilities.getBroadcastAddress(), 42));
-        events.add(TopologyChange.movedNode(FBUtilities.getBroadcastAddress(), 42));
+        events.add(TopologyChange.newNode(FBUtilities.getBroadcastAddressAndPort()));
+        events.add(TopologyChange.removedNode(FBUtilities.getBroadcastAddressAndPort()));
+        events.add(TopologyChange.movedNode(FBUtilities.getBroadcastAddressAndPort()));
 
-        events.add(StatusChange.nodeUp(FBUtilities.getBroadcastAddress(), 42));
-        events.add(StatusChange.nodeDown(FBUtilities.getBroadcastAddress(), 42));
+        events.add(StatusChange.nodeUp(FBUtilities.getBroadcastAddressAndPort()));
+        events.add(StatusChange.nodeDown(FBUtilities.getBroadcastAddressAndPort()));
 
         events.add(new SchemaChange(SchemaChange.Change.CREATED, "ks"));
         events.add(new SchemaChange(SchemaChange.Change.UPDATED, "ks"));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java b/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java
index b8d2633..a982624 100644
--- a/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java
+++ b/test/unit/org/apache/cassandra/utils/FBUtilitiesTest.java
@@ -153,7 +153,7 @@ public class FBUtilitiesTest
     }
 
     @Test
-    public void testGetBroadcastRpcAddress() throws Exception
+    public void testGetBroadcastNativeAddress() throws Exception
     {
         //When both rpc_address and broadcast_rpc_address are null, it should return the local address (from DD.applyAddressConfig)
         FBUtilities.reset();
@@ -161,21 +161,21 @@ public class FBUtilitiesTest
         testConfig.rpc_address = null;
         testConfig.broadcast_rpc_address = null;
         DatabaseDescriptor.applyAddressConfig(testConfig);
-        assertEquals(FBUtilities.getLocalAddress(), FBUtilities.getBroadcastRpcAddress());
+        assertEquals(FBUtilities.getJustLocalAddress(), FBUtilities.getJustBroadcastNativeAddress());
 
         //When rpc_address is defined and broadcast_rpc_address is null, it should return the rpc_address
         FBUtilities.reset();
         testConfig.rpc_address = "127.0.0.2";
         testConfig.broadcast_rpc_address = null;
         DatabaseDescriptor.applyAddressConfig(testConfig);
-        assertEquals(InetAddress.getByName("127.0.0.2"), FBUtilities.getBroadcastRpcAddress());
+        assertEquals(InetAddress.getByName("127.0.0.2"), FBUtilities.getJustBroadcastNativeAddress());
 
         //When both rpc_address and broadcast_rpc_address are defined, it should return broadcast_rpc_address
         FBUtilities.reset();
         testConfig.rpc_address = "127.0.0.2";
         testConfig.broadcast_rpc_address = "127.0.0.3";
         DatabaseDescriptor.applyAddressConfig(testConfig);
-        assertEquals(InetAddress.getByName("127.0.0.3"), FBUtilities.getBroadcastRpcAddress());
+        assertEquals(InetAddress.getByName("127.0.0.3"), FBUtilities.getJustBroadcastNativeAddress());
 
         FBUtilities.reset();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java b/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java
index 867918a..ffc1ace 100644
--- a/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java
+++ b/tools/stress/src/org/apache/cassandra/stress/CompactionStress.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.stress;
 
 import java.io.File;
 import java.io.IOError;
-import java.net.InetAddress;
 import java.net.URI;
 import java.util.*;
 import java.util.concurrent.*;
@@ -44,6 +43,7 @@ import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.stress.generate.PartitionGenerator;
@@ -185,7 +185,7 @@ public abstract class CompactionStress implements Runnable
         tokenMetadata.clearUnsafe();
         for (int i = 1; i <= numTokens; i++)
         {
-            InetAddress addr = FBUtilities.getBroadcastAddress();
+            InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort();
             List<Token> tokens = Lists.newArrayListWithCapacity(numTokens);
             for (int j = 0; j < numTokens; ++j)
                 tokens.add(p.getRandomToken(random));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
index a029162..24c10bf 100644
--- a/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
+++ b/tools/stress/src/org/apache/cassandra/stress/settings/SettingsNode.java
@@ -29,6 +29,8 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.*;
 
+import com.google.common.net.HostAndPort;
+
 import com.datastax.driver.core.Host;
 import org.apache.cassandra.stress.util.ResultLogger;
 
@@ -37,6 +39,7 @@ public class SettingsNode implements Serializable
     public final List<String> nodes;
     public final boolean isWhiteList;
     public final String datacenter;
+    public final boolean allowServerPortDiscovery;
 
     public SettingsNode(Options options)
     {
@@ -69,6 +72,7 @@ public class SettingsNode implements Serializable
 
         isWhiteList = options.whitelist.setByUser();
         datacenter = options.datacenter.value();
+        allowServerPortDiscovery = options.allowServerPortDiscovery.setByUser();
     }
 
     public Set<String> resolveAllPermitted(StressSettings settings)
@@ -80,7 +84,7 @@ public class SettingsNode implements Serializable
                 if (!isWhiteList)
                 {
                     for (Host host : settings.getJavaDriverClient().getCluster().getMetadata().getAllHosts())
-                        r.add(host.getAddress().getHostName());
+                        r.add(host.getSocketAddress().getHostString() + ":" + host.getSocketAddress().getPort());
                     break;
                 }
             case SIMPLE_NATIVE:
@@ -97,7 +101,8 @@ public class SettingsNode implements Serializable
         {
             try
             {
-                r.add(InetAddress.getByName(node));
+                HostAndPort hap = HostAndPort.fromString(node);
+                r.add(InetAddress.getByName(hap.getHost()));
             }
             catch (UnknownHostException e)
             {
@@ -114,7 +119,8 @@ public class SettingsNode implements Serializable
         {
             try
             {
-                r.add(new InetSocketAddress(InetAddress.getByName(node), port));
+                HostAndPort hap = HostAndPort.fromString(node).withDefaultPort(port);
+                r.add(new InetSocketAddress(InetAddress.getByName(hap.getHost()), hap.getPort()));
             }
             catch (UnknownHostException e)
             {
@@ -139,12 +145,13 @@ public class SettingsNode implements Serializable
         final OptionSimple datacenter = new OptionSimple("datacenter=", ".*", null, "Datacenter used for DCAwareRoundRobinLoadPolicy", false);
         final OptionSimple whitelist = new OptionSimple("whitelist", "", null, "Limit communications to the provided nodes", false);
         final OptionSimple file = new OptionSimple("file=", ".*", null, "Node file (one per line)", false);
+        final OptionSimple allowServerPortDiscovery = new OptionSimple("allow_server_port_discovery", "", null, "Allow Java client to discover server client port numbers", false);
         final OptionSimple list = new OptionSimple("", "[^=,]+(,[^=,]+)*", "localhost", "comma delimited list of nodes", false);
 
         @Override
         public List<? extends Option> options()
         {
-            return Arrays.asList(datacenter, whitelist, file, list);
+            return Arrays.asList(datacenter, whitelist, file, allowServerPortDiscovery, list);
         }
     }
 
@@ -154,6 +161,7 @@ public class SettingsNode implements Serializable
         out.println("  Nodes: " + nodes);
         out.println("  Is White List: " + isWhiteList);
         out.println("  Datacenter: " + datacenter);
+        out.println("  Allow server port discovery: " + allowServerPortDiscovery);
     }
 
     public static SettingsNode get(Map<String, String[]> clArgs)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/59b5b6be/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
index 4928cd2..fbcab4b 100644
--- a/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
+++ b/tools/stress/src/org/apache/cassandra/stress/util/JavaDriverClient.java
@@ -54,6 +54,7 @@ public class JavaDriverClient
     private Cluster cluster;
     private Session session;
     private final LoadBalancingPolicy loadBalancingPolicy;
+    private final boolean allowServerPortDiscovery;
 
     private static final ConcurrentMap<String, PreparedStatement> stmts = new ConcurrentHashMap<>();
 
@@ -73,6 +74,7 @@ public class JavaDriverClient
         this.encryptionOptions = encryptionOptions;
         this.loadBalancingPolicy = loadBalancingPolicy(settings);
         this.connectionsPerHost = settings.mode.connectionsPerHost == null ? 8 : settings.mode.connectionsPerHost;
+        this.allowServerPortDiscovery = settings.node.allowServerPortDiscovery;
 
         int maxThreadCount = 0;
         if (settings.rate.auto)
@@ -134,6 +136,9 @@ public class JavaDriverClient
                                                 .withoutJMXReporting()
                                                 .withProtocolVersion(protocolVersion)
                                                 .withoutMetrics(); // The driver uses metrics 3 with conflict with our version
+        if (allowServerPortDiscovery)
+            clusterBuilder = clusterBuilder.allowServerPortDiscovery();
+
         if (loadBalancingPolicy != null)
             clusterBuilder.withLoadBalancingPolicy(loadBalancingPolicy);
         clusterBuilder.withCompression(compression);
@@ -166,7 +171,7 @@ public class JavaDriverClient
         for (Host host : metadata.getAllHosts())
         {
             System.out.printf("Datacenter: %s; Host: %s; Rack: %s%n",
-                    host.getDatacenter(), host.getAddress(), host.getRack());
+                    host.getDatacenter(), host.getAddress() + ":" + host.getSocketAddress().getPort(), host.getRack());
         }
 
         session = cluster.connect();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message