cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r828130 [3/3] - in /incubator/cassandra/trunk: contrib/bmt_example/ contrib/property_snitch/src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/client/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/...
Date Wed, 21 Oct 2009 18:26:04 GMT
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/io/StreamContextManager.java Wed Oct 21 18:26:02 2009
@@ -24,10 +24,13 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.*;
+import java.net.InetAddress;
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
 import org.apache.log4j.Logger;
 
 public class StreamContextManager
@@ -42,7 +45,6 @@
     
     public static class StreamContext implements Serializable
     {
-        private static Logger logger_ = Logger.getLogger(StreamContextManager.StreamContext.class);
         private static ICompactSerializer<StreamContext> serializer_;
         
         static
@@ -218,7 +220,7 @@
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
             DataOutputStream dos = new DataOutputStream( bos );
             StreamStatusMessage.serializer().serialize(streamStatusMessage, dos);
-            return new Message(StorageService.getLocalStorageEndPoint(), "", StorageService.bootStrapTerminateVerbHandler_, bos.toByteArray());
+            return new Message(FBUtilities.getLocalAddress(), "", StorageService.bootStrapTerminateVerbHandler_, bos.toByteArray());
         }
         
         protected StreamContextManager.StreamStatus streamStatus_;
@@ -249,13 +251,13 @@
     }
         
     /* Maintain a stream context per host that is the source of the stream */
-    public static final Map<String, List<StreamContext>> ctxBag_ = new Hashtable<String, List<StreamContext>>();  
+    public static final Map<InetAddress, List<StreamContext>> ctxBag_ = new Hashtable<InetAddress, List<StreamContext>>();
     /* Maintain in this map the status of the streams that need to be sent back to the source */
-    public static final Map<String, List<StreamStatus>> streamStatusBag_ = new Hashtable<String, List<StreamStatus>>();
+    public static final Map<InetAddress, List<StreamStatus>> streamStatusBag_ = new Hashtable<InetAddress, List<StreamStatus>>();
     /* Maintains a callback handler per endpoint to notify the app that a stream from a given endpoint has been handled */
-    public static final Map<String, IStreamComplete> streamNotificationHandlers_ = new HashMap<String, IStreamComplete>();
+    public static final Map<InetAddress, IStreamComplete> streamNotificationHandlers_ = new HashMap<InetAddress, IStreamComplete>();
     
-    public synchronized static StreamContext getStreamContext(String key)
+    public synchronized static StreamContext getStreamContext(InetAddress key)
     {        
         List<StreamContext> context = ctxBag_.get(key);
         if ( context == null )
@@ -266,7 +268,7 @@
         return streamContext;
     }
     
-    public synchronized static StreamStatus getStreamStatus(String key)
+    public synchronized static StreamStatus getStreamStatus(InetAddress key)
     {
         List<StreamStatus> status = streamStatusBag_.get(key);
         if ( status == null )
@@ -281,27 +283,27 @@
      * This method helps determine if the StreamCompletionHandler needs
      * to be invoked for the data being streamed from a source. 
     */
-    public synchronized static boolean isDone(String key)
+    public synchronized static boolean isDone(InetAddress key)
     {
         return (ctxBag_.get(key) == null);
     }
     
-    public synchronized static IStreamComplete getStreamCompletionHandler(String key)
+    public synchronized static IStreamComplete getStreamCompletionHandler(InetAddress key)
     {
         return streamNotificationHandlers_.get(key);
     }
     
-    public synchronized static void removeStreamCompletionHandler(String key)
+    public synchronized static void removeStreamCompletionHandler(InetAddress key)
     {
         streamNotificationHandlers_.remove(key);
     }
     
-    public synchronized static void registerStreamCompletionHandler(String key, IStreamComplete streamComplete)
+    public synchronized static void registerStreamCompletionHandler(InetAddress key, IStreamComplete streamComplete)
     {
         streamNotificationHandlers_.put(key, streamComplete);
     }
     
-    public synchronized static void addStreamContext(String key, StreamContext streamContext, StreamStatus streamStatus)
+    public synchronized static void addStreamContext(InetAddress key, StreamContext streamContext, StreamStatus streamStatus)
     {
         /* Record the stream context */
         List<StreamContext> context = ctxBag_.get(key);        

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/IMessageSink.java Wed Oct 21 18:26:02 2009
@@ -18,7 +18,7 @@
 
 package org.apache.cassandra.net.sink;
 
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.net.Message;
 
 public interface IMessageSink

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/sink/SinkManager.java Wed Oct 21 18:26:02 2009
@@ -21,7 +21,7 @@
 import java.util.*;
 import java.io.IOException;
 
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.net.Message;
 
 public class SinkManager

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraDaemon.java Wed Oct 21 18:26:02 2009
@@ -21,7 +21,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Set;
+import java.net.InetAddress;
 
 import org.apache.log4j.Logger;
 
@@ -55,14 +55,14 @@
     private void setup() throws IOException, TTransportException
     {
         int listenPort = DatabaseDescriptor.getThriftPort();
-        String listenAddr = DatabaseDescriptor.getThriftAddress();
+        InetAddress listenAddr = DatabaseDescriptor.getThriftAddress();
         
         /* 
          * If ThriftAddress was left completely unconfigured, then assume
-         * the same default as ListenAddress, (InetAddress.getLocalHost).
+         * the same default as ListenAddress
          */
         if (listenAddr == null)
-            listenAddr = FBUtilities.getHostAddress();
+            listenAddr = FBUtilities.getLocalAddress();
         
         Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Wed Oct 21 18:26:02 2009
@@ -33,7 +33,7 @@
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.cassandra.dht.Token;
@@ -509,9 +509,9 @@
         else if (propertyName.equals(TOKEN_MAP))
         {
             HashMap<String, String> tokenToHostMap = new HashMap<String,String>();
-            Map<Token, EndPoint> endpointMap = storageService.getLiveEndPointMap();
-            for (Map.Entry<Token, EndPoint> e : endpointMap.entrySet())
-                tokenToHostMap.put(e.getKey().toString(), e.getValue().getHost());
+            Map<Token, InetAddress> endpointMap = storageService.getLiveEndPointMap();
+            for (Map.Entry<Token, InetAddress> e : endpointMap.entrySet())
+                tokenToHostMap.put(e.getKey().toString(), e.getValue().getHostAddress());
             return new JSONSerializer().serialize(tokenToHostMap);
         }
         else if (propertyName.equals("version"))

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyManager.java Wed Oct 21 18:26:02 2009
@@ -27,14 +27,12 @@
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.Cachetable;
-import org.apache.cassandra.utils.ICacheExpungeHook;
-import org.apache.cassandra.utils.ICachetable;
-import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.*;
+
 import org.apache.log4j.Logger;
 import org.apache.commons.lang.StringUtils;
 
@@ -82,13 +80,13 @@
 		{
 			IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
             /* Add the local storage endpoint to the replicas_ list */
-            replicas_.add(StorageService.getLocalStorageEndPoint());
+            replicas_.add(FBUtilities.getLocalAddress());
 			IAsyncCallback responseHandler = new DataRepairHandler(ConsistencyManager.this.replicas_.size(), readResponseResolver);	
             ReadCommand readCommand = constructReadMessage(false);
             Message message = readCommand.makeReadMessage();
             if (logger_.isDebugEnabled())
               logger_.debug("Performing read repair for " + readCommand_.key + " to " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
-			MessagingService.instance().sendRR(message, replicas_.toArray(new EndPoint[replicas_.size()]), responseHandler);
+			MessagingService.instance().sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]), responseHandler);
 		}
 	}
 	
@@ -133,10 +131,10 @@
 	private static long scheduledTimeMillis_ = 600;
 	private static ICachetable<String, String> readRepairTable_ = new Cachetable<String, String>(scheduledTimeMillis_);
 	private final Row row_;
-	protected final List<EndPoint> replicas_;
+	protected final List<InetAddress> replicas_;
 	private final ReadCommand readCommand_;
 
-    public ConsistencyManager(Row row, List<EndPoint> replicas, ReadCommand readCommand)
+    public ConsistencyManager(Row row, List<InetAddress> replicas, ReadCommand readCommand)
     {
         row_ = row;
         replicas_ = replicas;
@@ -151,7 +149,7 @@
 			Message message = readCommandDigestOnly.makeReadMessage();
             if (logger_.isDebugEnabled())
               logger_.debug("Reading consistency digest for " + readCommand_.key + " from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");
-            MessagingService.instance().sendRR(message, replicas_.toArray(new EndPoint[replicas_.size()]), new DigestResponseHandler());
+            MessagingService.instance().sendRR(message, replicas_.toArray(new InetAddress[replicas_.size()]), new DigestResponseHandler());
 		}
 		catch (IOException ex)
 		{

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadRepairManager.java Wed Oct 21 18:26:02 2009
@@ -22,7 +22,9 @@
 import java.util.concurrent.locks.*;
 
 import org.apache.cassandra.db.RowMutationMessage;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.Cachetable;
@@ -53,20 +55,26 @@
 	 * This is the internal class which actually
 	 * implements the global hook function called by the read repair manager
 	 */
-	static class ReadRepairPerformer implements
-			ICacheExpungeHook<String, Message>
+	static class ReadRepairPerformer implements ICacheExpungeHook<String, Message>
 	{
 		/*
 		 * The hook function which takes the end point and the row mutation that 
 		 * needs to be sent to the end point in order 
 		 * to perform read repair.
 		 */
-		public void callMe(String target,
-				Message message)
+		public void callMe(String target, Message message)
 		{
 			String[] pieces = FBUtilities.strip(target, ":");
-			EndPoint to = new EndPoint(pieces[0], Integer.parseInt(pieces[1]));
-			MessagingService.instance().sendOneWay(message, to);
+            InetAddress to = null;
+            try
+            {
+                to = InetAddress.getByName(pieces[0]);
+            }
+            catch (UnknownHostException e)
+            {
+                throw new RuntimeException(e);
+            }
+            MessagingService.instance().sendOneWay(message, to);
 		}
 
 	}
@@ -101,7 +109,7 @@
 	 * @param target endpoint on which the read repair should happen
 	 * @param rowMutationMessage the row mutation message that has the repaired row.
 	 */
-	public void schedule(EndPoint target, RowMutationMessage rowMutationMessage)
+	public void schedule(InetAddress target, RowMutationMessage rowMutationMessage)
 	{
         try
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Wed Oct 21 18:26:02 2009
@@ -31,7 +31,7 @@
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.RowMutationMessage;
 import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -61,7 +61,7 @@
         long startTime = System.currentTimeMillis();
 		Row retRow = null;
 		List<Row> rowList = new ArrayList<Row>();
-		List<EndPoint> endPoints = new ArrayList<EndPoint>();
+		List<InetAddress> endPoints = new ArrayList<InetAddress>();
 		String key = null;
 		String table = null;
 		byte[] digest = new byte[0];

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Wed Oct 21 18:26:02 2009
@@ -34,10 +34,11 @@
 import org.apache.cassandra.gms.EndPointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.IEndPointStateChangeSubscriber;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
 
 /*
  * The load balancing algorithm here is an implementation of
@@ -71,14 +72,14 @@
             /*
             int threshold = (int)(StorageLoadBalancer.TOPHEAVY_RATIO * averageSystemLoad());
             int myLoad = localLoad();            
-            EndPoint predecessor = StorageService.instance().getPredecessor(StorageService.getLocalStorageEndPoint());
+            InetAddress predecessor = StorageService.instance().getPredecessor(StorageService.getLocalStorageEndPoint());
             if (logger_.isDebugEnabled())
               logger_.debug("Trying to relocate the predecessor " + predecessor);
             boolean value = tryThisNode(myLoad, threshold, predecessor);
             if ( !value )
             {
                 loadInfo2_.remove(predecessor);
-                EndPoint successor = StorageService.instance().getSuccessor(StorageService.getLocalStorageEndPoint());
+                InetAddress successor = StorageService.instance().getSuccessor(StorageService.getLocalStorageEndPoint());
                 if (logger_.isDebugEnabled())
                   logger_.debug("Trying to relocate the successor " + successor);
                 value = tryThisNode(myLoad, threshold, successor);
@@ -87,7 +88,7 @@
                     loadInfo2_.remove(successor);
                     while ( !loadInfo2_.isEmpty() )
                     {
-                        EndPoint target = findARandomLightNode();
+                        InetAddress target = findARandomLightNode();
                         if ( target != null )
                         {
                             if (logger_.isDebugEnabled())
@@ -123,7 +124,7 @@
         }
 
         /*
-        private boolean tryThisNode(int myLoad, int threshold, EndPoint target)
+        private boolean tryThisNode(int myLoad, int threshold, InetAddress target)
         {
             boolean value = false;
             LoadInfo li = loadInfo2_.get(target);
@@ -155,7 +156,7 @@
     {
         public void doVerb(Message message)
         {
-            Message reply = message.getReply(StorageService.getLocalStorageEndPoint(), new byte[] {(byte)(isMoveable_.get() ? 1 : 0)});
+            Message reply = message.getReply(FBUtilities.getLocalAddress(), new byte[] {(byte)(isMoveable_.get() ? 1 : 0)});
             MessagingService.instance().sendOneWay(reply, message.getFrom());
             if ( isMoveable_.get() )
             {
@@ -186,9 +187,9 @@
 
     /* this indicates whether this node is already helping someone else */
     private AtomicBoolean isMoveable_ = new AtomicBoolean(false);
-    private Map<EndPoint, Double> loadInfo_ = new HashMap<EndPoint, Double>();
+    private Map<InetAddress, Double> loadInfo_ = new HashMap<InetAddress, Double>();
     /* This map is a clone of the one above and is used for various calculations during LB operation */
-    private Map<EndPoint, Double> loadInfo2_ = new HashMap<EndPoint, Double>();
+    private Map<InetAddress, Double> loadInfo2_ = new HashMap<InetAddress, Double>();
     /* This thread pool is used for initiating load balancing operations */
     private ExecutorService lb_ = new DebuggableThreadPoolExecutor("LB-OPERATIONS");
     /* This thread pool is used by target node to leave the ring. */
@@ -204,7 +205,7 @@
         Gossiper.instance().register(this);
     }
 
-    public void onChange(EndPoint endpoint, EndPointState epState)
+    public void onChange(InetAddress endpoint, EndPointState epState)
     {
         // load information for this specified endpoint for load balancing 
         ApplicationState loadInfoState = epState.getApplicationState(LoadDisseminator.loadInfo_);
@@ -232,7 +233,7 @@
         if ( !isMoveable_.get() )
             return false;
         int myload = localLoad();
-        EndPoint successor = StorageService.instance().getSuccessor(StorageService.getLocalStorageEndPoint());
+        InetAddress successor = StorageService.instance().getSuccessor(StorageService.getLocalStorageEndPoint());
         LoadInfo li = loadInfo2_.get(successor);
         // "load" is NULL means that the successor node has not
         // yet gossiped its load information. We should return
@@ -249,17 +250,17 @@
 
     private double localLoad()
     {
-        Double load = loadInfo2_.get(StorageService.getLocalStorageEndPoint());
+        Double load = loadInfo2_.get(FBUtilities.getLocalAddress());
         return load == null ? 0 : load;
     }
 
     private double averageSystemLoad()
     {
         int nodeCount = loadInfo2_.size();
-        Set<EndPoint> nodes = loadInfo2_.keySet();
+        Set<InetAddress> nodes = loadInfo2_.keySet();
 
         double systemLoad = 0;
-        for (EndPoint node : nodes)
+        for (InetAddress node : nodes)
         {
             systemLoad += loadInfo2_.get(node);
         }
@@ -274,7 +275,7 @@
         return ( localLoad() > ( StorageLoadBalancer.TOPHEAVY_RATIO * averageSystemLoad() ) );
     }
 
-    private boolean isMoveable(EndPoint target)
+    private boolean isMoveable(InetAddress target)
     {
         double threshold = StorageLoadBalancer.TOPHEAVY_RATIO * averageSystemLoad();
         if (isANeighbour(target))
@@ -295,20 +296,20 @@
         }
         else
         {
-            EndPoint successor = StorageService.instance().getSuccessor(target);
+            InetAddress successor = StorageService.instance().getSuccessor(target);
             double sLoad = loadInfo2_.get(successor);
             double targetLoad = loadInfo2_.get(target);
             return (sLoad + targetLoad) <= threshold;
         }
     }
 
-    private boolean isANeighbour(EndPoint neighbour)
+    private boolean isANeighbour(InetAddress neighbour)
     {
-        EndPoint predecessor = StorageService.instance().getPredecessor(StorageService.getLocalStorageEndPoint());
+        InetAddress predecessor = StorageService.instance().getPredecessor(FBUtilities.getLocalAddress());
         if ( predecessor.equals(neighbour) )
             return true;
 
-        EndPoint successor = StorageService.instance().getSuccessor(StorageService.getLocalStorageEndPoint());
+        InetAddress successor = StorageService.instance().getSuccessor(FBUtilities.getLocalAddress());
         if ( successor.equals(neighbour) )
             return true;
 
@@ -320,13 +321,13 @@
      * random one of the lightly loaded nodes and use them as
      * a potential target for load balance.
     */
-    private EndPoint findARandomLightNode()
+    private InetAddress findARandomLightNode()
     {
-        List<EndPoint> potentialCandidates = new ArrayList<EndPoint>();
-        Set<EndPoint> allTargets = loadInfo2_.keySet();
+        List<InetAddress> potentialCandidates = new ArrayList<InetAddress>();
+        Set<InetAddress> allTargets = loadInfo2_.keySet();
         double avgLoad = averageSystemLoad();
 
-        for (EndPoint target : allTargets)
+        for (InetAddress target : allTargets)
         {
             double load = loadInfo2_.get(target);
             if (load < avgLoad)
@@ -344,7 +345,7 @@
         return null;
     }
 
-    public Map<EndPoint, Double> getLoadInfo()
+    public Map<InetAddress, Double> getLoadInfo()
     {
         return loadInfo_;
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Oct 21 18:26:02 2009
@@ -28,11 +28,12 @@
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.TimedStatsDeque;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.dht.IPartitioner;
 
@@ -69,15 +70,15 @@
      * sent over the wire to N replicas where some of the replicas
      * may be hints.
      */
-    private static Map<EndPoint, Message> createWriteMessages(RowMutation rm, Map<EndPoint, EndPoint> endpointMap) throws IOException
+    private static Map<InetAddress, Message> createWriteMessages(RowMutation rm, Map<InetAddress, InetAddress> endpointMap) throws IOException
     {
-		Map<EndPoint, Message> messageMap = new HashMap<EndPoint, Message>();
+		Map<InetAddress, Message> messageMap = new HashMap<InetAddress, Message>();
 		Message message = rm.makeRowMutationMessage();
 
-		for (Map.Entry<EndPoint, EndPoint> entry : endpointMap.entrySet())
+		for (Map.Entry<InetAddress, InetAddress> entry : endpointMap.entrySet())
 		{
-            EndPoint target = entry.getKey();
-            EndPoint hint = entry.getValue();
+            InetAddress target = entry.getKey();
+            InetAddress hint = entry.getValue();
             if ( !target.equals(hint) )
 			{
 				Message hintedMessage = rm.makeRowMutationMessage();
@@ -113,13 +114,14 @@
         long startTime = System.currentTimeMillis();
 		try
 		{
-            EndPoint[] naturalEndpoints = StorageService.instance().getReadStorageEndPoints(rm.key());
-			Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getHintedStorageEndpointMap(rm.key(), naturalEndpoints);
-			Map<EndPoint, Message> messageMap = createWriteMessages(rm, endpointMap);
-			for (Map.Entry<EndPoint, Message> entry : messageMap.entrySet())
+            InetAddress[] naturalEndpoints = StorageService.instance().getReadStorageEndPoints(rm.key());
+            // (This is the ZERO consistency level, so user doesn't care if we don't really have N destinations available.)
+			Map<InetAddress, InetAddress> endpointMap = StorageService.instance().getHintedStorageEndpointMap(rm.key(), naturalEndpoints);
+			Map<InetAddress, Message> messageMap = createWriteMessages(rm, endpointMap);
+			for (Map.Entry<InetAddress, Message> entry : messageMap.entrySet())
 			{
                 Message message = entry.getValue();
-                EndPoint endpoint = entry.getKey();
+                InetAddress endpoint = entry.getKey();
                 if (logger.isDebugEnabled())
                     logger.debug("insert writing key " + rm.key() + " to " + message.getMessageId() + "@" + endpoint);
                 MessagingService.instance().sendOneWay(message, endpoint);
@@ -149,10 +151,10 @@
         }
         try
         {
-            EndPoint[] naturalEndpoints = StorageService.instance().getReadStorageEndPoints(rm.key());
-			Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getHintedStorageEndpointMap(rm.key(), naturalEndpoints);
+            InetAddress[] naturalEndpoints = StorageService.instance().getReadStorageEndPoints(rm.key());
+            Map<InetAddress, InetAddress> endpointMap = StorageService.instance().getHintedStorageEndpointMap(rm.key(), naturalEndpoints);
             int blockFor = determineBlockFor(naturalEndpoints.length, endpointMap.size(), consistency_level);
-            List<EndPoint> primaryNodes = getUnhintedNodes(endpointMap);
+            List<InetAddress> primaryNodes = getUnhintedNodes(endpointMap);
             if (primaryNodes.size() < blockFor) // guarantee blockFor = W live nodes.
             {
                 throw new UnavailableException();
@@ -162,12 +164,12 @@
                 logger.debug("insertBlocking writing key " + rm.key() + " to " + message.getMessageId() + "@[" + StringUtils.join(endpointMap.keySet(), ", ") + "]");
 
             // Get all the targets and stick them in an array
-            MessagingService.instance().sendRR(message, primaryNodes.toArray(new EndPoint[primaryNodes.size()]), quorumResponseHandler);
+            MessagingService.instance().sendRR(message, primaryNodes.toArray(new InetAddress[primaryNodes.size()]), quorumResponseHandler);
             if (!quorumResponseHandler.get())
                 throw new UnavailableException();
             if (primaryNodes.size() < endpointMap.size()) // Do we need to bother with Hinted Handoff?
             {
-                for (Map.Entry<EndPoint, EndPoint> e : endpointMap.entrySet())
+                for (Map.Entry<InetAddress, InetAddress> e : endpointMap.entrySet())
                 {
                     if (e.getKey() != e.getValue()) // Hinted Handoff to target
                     {
@@ -187,10 +189,10 @@
         }
     }
 
-    private static List<EndPoint> getUnhintedNodes(Map<EndPoint, EndPoint> endpointMap)
+    private static List<InetAddress> getUnhintedNodes(Map<InetAddress, InetAddress> endpointMap)
     {
-        List<EndPoint> liveEndPoints = new ArrayList<EndPoint>(endpointMap.size());
-        for (Map.Entry<EndPoint, EndPoint> e : endpointMap.entrySet())
+        List<InetAddress> liveEndPoints = new ArrayList<InetAddress>(endpointMap.size());
+        for (Map.Entry<InetAddress, InetAddress> e : endpointMap.entrySet())
         {
             if (e.getKey() == e.getValue())
             {
@@ -247,7 +249,7 @@
 
         for (ReadCommand command: commands)
         {
-            EndPoint endPoint = StorageService.instance().findSuitableEndPoint(command.key);
+            InetAddress endPoint = StorageService.instance().findSuitableEndPoint(command.key);
             Message message = command.makeReadMessage();
 
             if (logger.isDebugEnabled())
@@ -296,8 +298,8 @@
 
             for (ReadCommand command: commands)
             {
-                EndPoint[] endpoints = StorageService.instance().getReadStorageEndPoints(command.key);
-                boolean foundLocal = Arrays.asList(endpoints).contains(StorageService.getLocalStorageEndPoint());
+                InetAddress[] endpoints = StorageService.instance().getReadStorageEndPoints(command.key);
+                boolean foundLocal = Arrays.asList(endpoints).contains(FBUtilities.getLocalAddress());
                 //TODO: Throw InvalidRequest if we're in bootstrap mode?
                 if (foundLocal && !StorageService.instance().isBootstrapMode())
                 {
@@ -340,7 +342,7 @@
     private static List<Row> strongRead(List<ReadCommand> commands) throws IOException, TimeoutException, InvalidRequestException, UnavailableException
     {
         List<QuorumResponseHandler<Row>> quorumResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>();
-        List<EndPoint[]> commandEndPoints = new ArrayList<EndPoint[]>();
+        List<InetAddress[]> commandEndPoints = new ArrayList<InetAddress[]>();
         List<Row> rows = new ArrayList<Row>();
 
         int commandIndex = 0;
@@ -356,11 +358,11 @@
             Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
 
             QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(DatabaseDescriptor.getQuorum(), new ReadResponseResolver());
-            EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(command.key);
-            List<EndPoint> endpointList = new ArrayList<EndPoint>(Arrays.asList(StorageService.instance().getReadStorageEndPoints(command.key)));
+            InetAddress dataPoint = StorageService.instance().findSuitableEndPoint(command.key);
+            List<InetAddress> endpointList = new ArrayList<InetAddress>(Arrays.asList(StorageService.instance().getReadStorageEndPoints(command.key)));
             /* Remove the local storage endpoint from the list. */
             endpointList.remove(dataPoint);
-            EndPoint[] endPoints = new EndPoint[endpointList.size() + 1];
+            InetAddress[] endPoints = new InetAddress[endpointList.size() + 1];
             Message messages[] = new Message[endpointList.size() + 1];
 
             /*
@@ -374,7 +376,7 @@
                 logger.debug("strongread reading data for " + command + " from " + message.getMessageId() + "@" + dataPoint);
             for (int i = 1; i < endPoints.length; i++)
             {
-                EndPoint digestPoint = endpointList.get(i - 1);
+                InetAddress digestPoint = endpointList.get(i - 1);
                 endPoints[i] = digestPoint;
                 messages[i] = messageDigestOnly;
                 if (logger.isDebugEnabled())
@@ -440,9 +442,9 @@
         List<Row> rows = new ArrayList<Row>();
         for (ReadCommand command: commands)
         {
-            List<EndPoint> endpoints = StorageService.instance().getLiveReadStorageEndPoints(command.key);
+            List<InetAddress> endpoints = StorageService.instance().getLiveReadStorageEndPoints(command.key);
             /* Remove the local storage endpoint from the list. */
-            endpoints.remove(StorageService.getLocalStorageEndPoint());
+            endpoints.remove(FBUtilities.getLocalAddress());
             // TODO: throw a thrift exception if we do not have N nodes
 
             if (logger.isDebugEnabled())
@@ -471,9 +473,9 @@
         List<String> allKeys = new ArrayList<String>();
         RangeCommand command = rawCommand;
 
-        EndPoint endPoint = StorageService.instance().findSuitableEndPoint(command.startWith);
-        EndPoint startEndpoint = endPoint;
-        EndPoint wrapEndpoint = tokenMetadata.getFirstEndpoint();
+        InetAddress endPoint = StorageService.instance().findSuitableEndPoint(command.startWith);
+        InetAddress startEndpoint = endPoint;
+        InetAddress wrapEndpoint = tokenMetadata.getFirstEndpoint();
 
         do
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Oct 21 18:26:02 2009
@@ -18,13 +18,13 @@
 
 package org.apache.cassandra.service;
 
-import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.net.InetAddress;
 import javax.management.*;
 
 import org.apache.cassandra.concurrent.*;
@@ -34,8 +34,6 @@
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.locator.*;
 import org.apache.cassandra.net.*;
-import org.apache.cassandra.net.io.StreamContextManager;
-import org.apache.cassandra.tools.MembershipCleanerVerbHandler;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -70,40 +68,26 @@
     public final static String bootStrapInitiateDoneVerbHandler_ = "BOOTSTRAP-INITIATE-DONE-VERB-HANDLER";
     public final static String bootStrapTerminateVerbHandler_ = "BOOTSTRAP-TERMINATE-VERB-HANDLER";
     public final static String dataFileVerbHandler_ = "DATA-FILE-VERB-HANDLER";
-    public final static String mbrshipCleanerVerbHandler_ = "MBRSHIP-CLEANER-VERB-HANDLER";
     public final static String bootstrapMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
     public final static String rangeVerbHandler_ = "RANGE-VERB-HANDLER";
     public final static String bootstrapTokenVerbHandler_ = "SPLITS-VERB-HANDLER";
 
-    private static EndPoint tcpAddr_;
-    private static EndPoint udpAddr_;
     private static IPartitioner partitioner_ = DatabaseDescriptor.getPartitioner();
 
-
     private static volatile StorageService instance_;
 
-    public static EndPoint getLocalStorageEndPoint()
-    {
-        return tcpAddr_;
-    }
-
-    public static EndPoint getLocalControlEndPoint()
-    {
-        return udpAddr_;
-    }
-
     public static IPartitioner<?> getPartitioner() {
         return partitioner_;
     }
 
     public Set<Range> getLocalRanges()
     {
-        return getRangesForEndPoint(getLocalStorageEndPoint());
+        return getRangesForEndPoint(FBUtilities.getLocalAddress());
     }
 
     public Range getLocalPrimaryRange()
     {
-        return getPrimaryRangeForEndPoint(getLocalStorageEndPoint());
+        return getPrimaryRangeForEndPoint(FBUtilities.getLocalAddress());
     }
 
     /*
@@ -157,16 +141,16 @@
     private AbstractReplicationStrategy replicationStrategy_;
     /* Are we starting this node in bootstrap mode? */
     private boolean isBootstrapMode;
-    private Set<EndPoint> bootstrapSet;
+    private Set<InetAddress> bootstrapSet;
   
-    public synchronized void addBootstrapSource(EndPoint s)
+    public synchronized void addBootstrapSource(InetAddress s)
     {
         if (logger_.isDebugEnabled())
             logger_.debug("Added " + s + " as a bootstrap source");
         bootstrapSet.add(s);
     }
     
-    public synchronized boolean removeBootstrapSource(EndPoint s)
+    public synchronized boolean removeBootstrapSource(InetAddress s)
     {
         bootstrapSet.remove(s);
 
@@ -176,7 +160,7 @@
         {
             SystemTable.setBootstrapped();
             isBootstrapMode = false;
-            updateTokenMetadata(storageMetadata_.getToken(), StorageService.tcpAddr_, false);
+            updateTokenMetadata(storageMetadata_.getToken(), FBUtilities.getLocalAddress(), false);
 
             logger_.info("Bootstrap completed! Now serving reads.");
             /* Tell others you're not bootstrapping anymore */
@@ -185,7 +169,7 @@
         return isBootstrapMode;
     }
 
-    private void updateTokenMetadata(Token token, EndPoint endpoint, boolean isBootstraping)
+    private void updateTokenMetadata(Token token, InetAddress endpoint, boolean isBootstraping)
     {
         tokenMetadata_.update(token, endpoint, isBootstraping);
         if (!isBootstraping)
@@ -213,7 +197,7 @@
             throw new RuntimeException(e);
         }
 
-        bootstrapSet = new HashSet<EndPoint>();
+        bootstrapSet = new HashSet<InetAddress>();
         endPointSnitch_ = DatabaseDescriptor.getEndPointSnitch();
 
         /* register the verb handlers */
@@ -223,7 +207,6 @@
         MessagingService.instance().registerVerbHandlers(readRepairVerbHandler_, new ReadRepairVerbHandler());
         MessagingService.instance().registerVerbHandlers(readVerbHandler_, new ReadVerbHandler());
         MessagingService.instance().registerVerbHandlers(dataFileVerbHandler_, new DataFileVerbHandler() );
-        MessagingService.instance().registerVerbHandlers(mbrshipCleanerVerbHandler_, new MembershipCleanerVerbHandler() );
         MessagingService.instance().registerVerbHandlers(rangeVerbHandler_, new RangeVerbHandler());
         // see BootStrapper for a summary of how the bootstrap verbs interact
         MessagingService.instance().registerVerbHandlers(bootstrapTokenVerbHandler_, new BootStrapper.BootstrapTokenVerbHandler());
@@ -252,15 +235,13 @@
     public void start() throws IOException
     {
         storageMetadata_ = SystemTable.initMetadata();
-        tcpAddr_ = new EndPoint(FBUtilities.getHostAddress(), DatabaseDescriptor.getStoragePort());
-        udpAddr_ = new EndPoint(FBUtilities.getHostAddress(), DatabaseDescriptor.getControlPort());
         isBootstrapMode = DatabaseDescriptor.isAutoBootstrap()
-                          && !(DatabaseDescriptor.getSeeds().contains(udpAddr_.getHost()) || SystemTable.isBootstrapped());
+                          && !(DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress()) || SystemTable.isBootstrapped());
 
         /* Listen for application messages */
-        MessagingService.instance().listen(tcpAddr_);
+        MessagingService.instance().listen(FBUtilities.getLocalAddress());
         /* Listen for control messages */
-        MessagingService.instance().listenUDP(udpAddr_);
+        MessagingService.instance().listenUDP(FBUtilities.getLocalAddress());
 
         SelectorManager.getSelectorManager().start();
         SelectorManager.getUdpSelectorManager().start();
@@ -271,7 +252,7 @@
         // for bootstrap to get the load info it needs.
         // (we won't be part of the storage ring though until we add a nodeId to our state, below.)
         Gossiper.instance().register(this);
-        Gossiper.instance().start(udpAddr_, storageMetadata_.getGeneration());
+        Gossiper.instance().start(FBUtilities.getLocalAddress(), storageMetadata_.getGeneration());
 
         if (isBootstrapMode)
         {
@@ -280,7 +261,7 @@
         else
         {
             SystemTable.setBootstrapped();
-            tokenMetadata_.update(storageMetadata_.getToken(), StorageService.tcpAddr_, isBootstrapMode);
+            tokenMetadata_.update(storageMetadata_.getToken(), FBUtilities.getLocalAddress(), isBootstrapMode);
         }
 
         // Gossip my token.
@@ -301,7 +282,7 @@
     }
 
     /* TODO: used for testing */
-    public void updateTokenMetadataUnsafe(Token token, EndPoint endpoint)
+    public void updateTokenMetadataUnsafe(Token token, InetAddress endpoint)
     {
         tokenMetadata_.update(token, endpoint);
     }
@@ -312,13 +293,13 @@
     }
     
     /*
-     * Given an EndPoint this method will report if the
+     * Given an InetAddress this method will report if the
      * endpoint is in the same data center as the local
      * storage endpoint.
     */
-    public boolean isInSameDataCenter(EndPoint endpoint) throws IOException
+    public boolean isInSameDataCenter(InetAddress endpoint) throws IOException
     {
-        return endPointSnitch_.isInSameDataCenter(StorageService.tcpAddr_, endpoint);
+        return endPointSnitch_.isInSameDataCenter(FBUtilities.getLocalAddress(), endpoint);
     }
     
     /*
@@ -326,7 +307,7 @@
      * sure that the N replicas are in sync. We do this in the
      * background when we do not care much about consistency.
      */
-    public void doConsistencyCheck(Row row, List<EndPoint> endpoints, ReadCommand command)
+    public void doConsistencyCheck(Row row, List<InetAddress> endpoints, ReadCommand command)
     {
         Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, command);
         consistencyManager_.submit(consistencySentinel);
@@ -335,11 +316,11 @@
     public Map<Range, List<String>> getRangeToEndPointMap()
     {
         /* Get the token to endpoint map. */
-        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        Map<Token, InetAddress> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
         /* All the ranges for the tokens */
         Range[] ranges = getAllRanges(tokenToEndPointMap.keySet());
         Map<Range, List<String>> map = new HashMap<Range, List<String>>();
-        for (Map.Entry<Range,List<EndPoint>> entry : constructRangeToEndPointMap(ranges).entrySet())
+        for (Map.Entry<Range,List<InetAddress>> entry : constructRangeToEndPointMap(ranges).entrySet())
         {
             map.put(entry.getKey(), stringify(entry.getValue()));
         }
@@ -352,14 +333,14 @@
      * @param ranges
      * @return mapping of ranges to the replicas responsible for them.
     */
-    public Map<Range, List<EndPoint>> constructRangeToEndPointMap(Range[] ranges)
+    public Map<Range, List<InetAddress>> constructRangeToEndPointMap(Range[] ranges)
     {
-        Map<Range, List<EndPoint>> rangeToEndPointMap = new HashMap<Range, List<EndPoint>>();
+        Map<Range, List<InetAddress>> rangeToEndPointMap = new HashMap<Range, List<InetAddress>>();
         for (Range range : ranges)
         {
-            EndPoint[] endpoints = replicationStrategy_.getReadStorageEndPoints(range.right());
+            InetAddress[] endpoints = replicationStrategy_.getReadStorageEndPoints(range.right());
             // create a new ArrayList since a bunch of methods like to mutate the endpointmap List
-            rangeToEndPointMap.put(range, new ArrayList<EndPoint>(Arrays.asList(endpoints)));
+            rangeToEndPointMap.put(range, new ArrayList<InetAddress>(Arrays.asList(endpoints)));
         }
         return rangeToEndPointMap;
     }
@@ -371,15 +352,15 @@
      * @param tokenToEndPointMap mapping of token to endpoints.
      * @return mapping of ranges to the replicas responsible for them.
     */
-    public Map<Range, List<EndPoint>> constructRangeToEndPointMap(Range[] ranges, Map<Token, EndPoint> tokenToEndPointMap)
+    public Map<Range, List<InetAddress>> constructRangeToEndPointMap(Range[] ranges, Map<Token, InetAddress> tokenToEndPointMap)
     {
         if (logger_.isDebugEnabled())
           logger_.debug("Constructing range to endpoint map ...");
-        Map<Range, List<EndPoint>> rangeToEndPointMap = new HashMap<Range, List<EndPoint>>();
+        Map<Range, List<InetAddress>> rangeToEndPointMap = new HashMap<Range, List<InetAddress>>();
         for ( Range range : ranges )
         {
-            EndPoint[] endpoints = replicationStrategy_.getReadStorageEndPoints(range.right(), tokenToEndPointMap);
-            rangeToEndPointMap.put(range, new ArrayList<EndPoint>( Arrays.asList(endpoints) ) );
+            InetAddress[] endpoints = replicationStrategy_.getReadStorageEndPoints(range.right(), tokenToEndPointMap);
+            rangeToEndPointMap.put(range, new ArrayList<InetAddress>( Arrays.asList(endpoints) ) );
         }
         if (logger_.isDebugEnabled())
           logger_.debug("Done constructing range to endpoint map ...");
@@ -391,9 +372,8 @@
      *  we are interested in new tokens as a result of a new node or an
      *  existing node moving to a new location on the ring.
     */
-    public void onChange(EndPoint endpoint, EndPointState epState)
+    public void onChange(InetAddress endpoint, EndPointState epState)
     {
-        EndPoint ep = new EndPoint(endpoint.getHost(), DatabaseDescriptor.getStoragePort());
         /* node identifier for this endpoint on the identifier space */
         ApplicationState nodeIdState = epState.getApplicationState(StorageService.nodeId_);
         /* Check if this has a bootstrapping state message */
@@ -401,14 +381,14 @@
         if (bootstrapState)
         {
             if (logger_.isDebugEnabled())
-                logger_.debug(ep + " is in bootstrap state.");
+                logger_.debug(endpoint + " is in bootstrap state.");
         }
         if (nodeIdState != null)
         {
             Token newToken = getPartitioner().getTokenFactory().fromString(nodeIdState.getState());
             if (logger_.isDebugEnabled())
               logger_.debug("CHANGE IN STATE FOR " + endpoint + " - has token " + nodeIdState.getState());
-            Token oldToken = tokenMetadata_.getToken(ep);
+            Token oldToken = tokenMetadata_.getToken(endpoint);
 
             if ( oldToken != null )
             {
@@ -421,8 +401,8 @@
                 if ( !oldToken.equals(newToken) )
                 {
                     if (logger_.isDebugEnabled())
-                      logger_.debug("Relocation for endpoint " + ep);
-                    updateTokenMetadata(newToken, ep, bootstrapState);
+                      logger_.debug("Relocation for endpoint " + endpoint);
+                    updateTokenMetadata(newToken, endpoint, bootstrapState);
                 }
                 else
                 {
@@ -431,7 +411,7 @@
                      * Deliver the hints that we have for this endpoint.
                     */
                     if (logger_.isDebugEnabled())
-                      logger_.debug("Sending hinted data to " + ep);
+                      logger_.debug("Sending hinted data to " + endpoint);
                     deliverHints(endpoint);
                 }
             }
@@ -440,7 +420,7 @@
                 /*
                  * This is a new node and we just update the token map.
                 */
-                updateTokenMetadata(newToken, ep, bootstrapState);
+                updateTokenMetadata(newToken, endpoint, bootstrapState);
             }
         }
         else
@@ -452,8 +432,8 @@
             if ( epState.isAlive() && tokenMetadata_.isKnownEndPoint(endpoint) )
             {
                 if (logger_.isDebugEnabled())
-                  logger_.debug("EndPoint " + ep + " just recovered from a partition. Sending hinted data.");
-                deliverHints(ep);
+                  logger_.debug("InetAddress " + endpoint + " just recovered from a partition. Sending hinted data.");
+                deliverHints(endpoint);
             }
         }
     }
@@ -472,15 +452,15 @@
     public Map<String, String> getLoadMap()
     {
         Map<String, String> map = new HashMap<String, String>();
-        for (Map.Entry<EndPoint,Double> entry : StorageLoadBalancer.instance().getLoadInfo().entrySet())
+        for (Map.Entry<InetAddress,Double> entry : StorageLoadBalancer.instance().getLoadInfo().entrySet())
         {
-            map.put(entry.getKey().getHost(), FileUtils.stringifyFileSize(entry.getValue()));
+            map.put(entry.getKey().getHostAddress(), FileUtils.stringifyFileSize(entry.getValue()));
         }
         // gossiper doesn't bother sending to itself, so if there are no other nodes around
         // we need to cheat to get load information for the local node
-        if (!map.containsKey(getLocalControlEndPoint().getHost()))
+        if (!map.containsKey(FBUtilities.getLocalAddress().getHostAddress()))
         {
-            map.put(getLocalControlEndPoint().getHost(), getLoadString());
+            map.put(FBUtilities.getLocalAddress().getHostAddress(), getLoadString());
         }
         return map;
     }
@@ -496,7 +476,7 @@
         /* update the token on disk */
         SystemTable.updateToken(token);
         /* Update the token maps */
-        tokenMetadata_.update(token, StorageService.tcpAddr_);
+        tokenMetadata_.update(token, FBUtilities.getLocalAddress());
         /* Gossip this new token for the local storage instance */
         ApplicationState state = new ApplicationState(StorageService.getPartitioner().getTokenFactory().toString(token));
         Gossiper.instance().addApplicationState(StorageService.nodeId_, state);
@@ -509,7 +489,7 @@
      *  @param endpoint remove the token state associated with this 
      *         endpoint.
      */
-    public void removeTokenState(EndPoint endpoint) 
+    public void removeTokenState(InetAddress endpoint)
     {
         tokenMetadata_.remove(endpoint);
         /* Remove the state from the Gossiper */
@@ -520,14 +500,14 @@
      * Deliver hints to the specified node when it has crashed
      * and come back up/ marked as alive after a network partition
     */
-    public final void deliverHints(EndPoint endpoint)
+    public final void deliverHints(InetAddress endpoint)
     {
         HintedHandOffManager.instance().deliverHints(endpoint);
     }
 
     public Token getLocalToken()
     {
-        return tokenMetadata_.getToken(tcpAddr_);
+        return tokenMetadata_.getToken(FBUtilities.getLocalAddress());
     }
 
     /* This methods belong to the MBean interface */
@@ -547,29 +527,29 @@
         return stringify(Gossiper.instance().getUnreachableMembers());
     }
 
-    private Set<String> stringify(Set<EndPoint> endPoints)
+    private Set<String> stringify(Set<InetAddress> endPoints)
     {
         Set<String> stringEndPoints = new HashSet<String>();
-        for (EndPoint ep : endPoints)
+        for (InetAddress ep : endPoints)
         {
-            stringEndPoints.add(ep.getHost());
+            stringEndPoints.add(ep.getHostAddress());
         }
         return stringEndPoints;
     }
 
-    private List<String> stringify(List<EndPoint> endPoints)
+    private List<String> stringify(List<InetAddress> endPoints)
     {
         List<String> stringEndPoints = new ArrayList<String>();
-        for (EndPoint ep : endPoints)
+        for (InetAddress ep : endPoints)
         {
-            stringEndPoints.add(ep.getHost());
+            stringEndPoints.add(ep.getHostAddress());
         }
         return stringEndPoints;
     }
 
     public int getCurrentGenerationNumber()
     {
-        return Gossiper.instance().getCurrentGenerationNumber(udpAddr_);
+        return Gossiper.instance().getCurrentGenerationNumber(FBUtilities.getLocalAddress());
     }
 
     public void forceTableCleanup() throws IOException
@@ -591,51 +571,6 @@
             table.forceCompaction();
         }        
     }
-    
-    public void forceHandoff(List<String> dataDirectories, String host) throws IOException
-    {       
-        List<File> filesList = new ArrayList<File>();
-        List<StreamContextManager.StreamContext> streamContexts = new ArrayList<StreamContextManager.StreamContext>();
-        
-        for (String dataDir : dataDirectories)
-        {
-            File directory = new File(dataDir);
-            Collections.addAll(filesList, directory.listFiles());            
-        
-
-            for (File tableDir : directory.listFiles())
-            {
-                String tableName = tableDir.getName();
-
-                for (File file : tableDir.listFiles())
-                {
-                    streamContexts.add(new StreamContextManager.StreamContext(file.getAbsolutePath(), file.length(), tableName));
-                    if (logger_.isDebugEnabled())
-                      logger_.debug("Stream context metadata " + streamContexts);
-                }
-            }
-        }
-        
-        if ( streamContexts.size() > 0 )
-        {
-            EndPoint target = new EndPoint(host, DatabaseDescriptor.getStoragePort());
-            /* Set up the stream manager with the files that need to streamed */
-            final StreamContextManager.StreamContext[] contexts = streamContexts.toArray(new StreamContextManager.StreamContext[streamContexts.size()]);
-            StreamManager.instance(target).addFilesToStream(contexts);
-            /* Send the bootstrap initiate message */
-            final StreamContextManager.StreamContext[] bootContexts = streamContexts.toArray(new StreamContextManager.StreamContext[streamContexts.size()]);
-            BootstrapInitiateMessage biMessage = new BootstrapInitiateMessage(bootContexts);
-            Message message = BootstrapInitiateMessage.makeBootstrapInitiateMessage(biMessage);
-            if (logger_.isDebugEnabled())
-              logger_.debug("Sending a bootstrap initiate message to " + target + " ...");
-            MessagingService.instance().sendOneWay(message, target);
-            if (logger_.isDebugEnabled())
-              logger_.debug("Waiting for transfer to " + target + " to complete");
-            StreamManager.instance(target).waitForStreamCompletion();
-            if (logger_.isDebugEnabled())
-              logger_.debug("Done with transfer to " + target);  
-        }
-    }
 
     /**
      * Takes the snapshot for a given table.
@@ -705,7 +640,7 @@
      * This method returns the predecessor of the endpoint ep on the identifier
      * space.
      */
-    EndPoint getPredecessor(EndPoint ep)
+    InetAddress getPredecessor(InetAddress ep)
     {
         Token token = tokenMetadata_.getToken(ep);
         return tokenMetadata_.getEndPoint(replicationStrategy_.getPredecessor(token, tokenMetadata_.cloneTokenEndPointMap()));
@@ -715,7 +650,7 @@
      * This method returns the successor of the endpoint ep on the identifier
      * space.
      */
-    public EndPoint getSuccessor(EndPoint ep)
+    public InetAddress getSuccessor(InetAddress ep)
     {
         Token token = tokenMetadata_.getToken(ep);
         return tokenMetadata_.getEndPoint(replicationStrategy_.getSuccessor(token, tokenMetadata_.cloneTokenEndPointMap()));
@@ -726,7 +661,7 @@
      * @param ep endpoint we are interested in.
      * @return range for the specified endpoint.
      */
-    public Range getPrimaryRangeForEndPoint(EndPoint ep)
+    public Range getPrimaryRangeForEndPoint(InetAddress ep)
     {
         Token right = tokenMetadata_.getToken(ep);
         return replicationStrategy_.getPrimaryRangeFor(right, tokenMetadata_.cloneTokenEndPointMap());
@@ -737,7 +672,7 @@
      * @param ep endpoint we are interested in.
      * @return ranges for the specified endpoint.
      */
-    Set<Range> getRangesForEndPoint(EndPoint ep)
+    Set<Range> getRangesForEndPoint(InetAddress ep)
     {
         return replicationStrategy_.getRangeMap().get(ep);
     }
@@ -771,11 +706,11 @@
      * @param key - key for which we need to find the endpoint
      * @return value - the endpoint responsible for this key
      */
-    public EndPoint getPrimary(String key)
+    public InetAddress getPrimary(String key)
     {
-        EndPoint endpoint = StorageService.tcpAddr_;
+        InetAddress endpoint = FBUtilities.getLocalAddress();
         Token token = partitioner_.getToken(key);
-        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        Map<Token, InetAddress> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
         List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
         if (tokens.size() > 0)
         {
@@ -809,8 +744,8 @@
     */
     public boolean isPrimary(String key)
     {
-        EndPoint endpoint = getPrimary(key);
-        return StorageService.tcpAddr_.equals(endpoint);
+        InetAddress endpoint = getPrimary(key);
+        return FBUtilities.getLocalAddress().equals(endpoint);
     }
 
     /**
@@ -820,7 +755,7 @@
      * @param key - key for which we need to find the endpoint return value -
      * the endpoint responsible for this key
      */
-    public EndPoint[] getReadStorageEndPoints(String key)
+    public InetAddress[] getReadStorageEndPoints(String key)
     {
         return replicationStrategy_.getReadStorageEndPoints(partitioner_.getToken(key));
     }    
@@ -832,12 +767,12 @@
      * @param key - key for which we need to find the endpoint return value -
      * the endpoint responsible for this key
      */
-    public List<EndPoint> getLiveReadStorageEndPoints(String key)
+    public List<InetAddress> getLiveReadStorageEndPoints(String key)
     {
-    	List<EndPoint> liveEps = new ArrayList<EndPoint>();
-    	EndPoint[] endpoints = getReadStorageEndPoints(key);
+    	List<InetAddress> liveEps = new ArrayList<InetAddress>();
+    	InetAddress[] endpoints = getReadStorageEndPoints(key);
     	
-    	for ( EndPoint endpoint : endpoints )
+    	for ( InetAddress endpoint : endpoints )
     	{
     		if ( FailureDetector.instance().isAlive(endpoint) )
     			liveEps.add(endpoint);
@@ -853,7 +788,7 @@
      * @param key - key for which we need to find the endpoint return value -
      * the endpoint responsible for this key
      */
-    public Map<EndPoint, EndPoint> getHintedStorageEndpointMap(String key, EndPoint[] naturalEndpoints)
+    public Map<InetAddress, InetAddress> getHintedStorageEndpointMap(String key, InetAddress[] naturalEndpoints)
     {
         return replicationStrategy_.getHintedStorageEndPoints(partitioner_.getToken(key), naturalEndpoints);
     }
@@ -862,12 +797,12 @@
      * This function finds the most suitable endpoint given a key.
      * It checks for locality and alive test.
      */
-	public EndPoint findSuitableEndPoint(String key) throws IOException, UnavailableException
+	public InetAddress findSuitableEndPoint(String key) throws IOException, UnavailableException
 	{
-		EndPoint[] endpoints = getReadStorageEndPoints(key);
-		for(EndPoint endPoint: endpoints)
+		InetAddress[] endpoints = getReadStorageEndPoints(key);
+		for(InetAddress endPoint: endpoints)
 		{
-			if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
+            if(endPoint.equals(FBUtilities.getLocalAddress()))
 			{
 				return endPoint;
 			}
@@ -889,7 +824,7 @@
 			if ( FailureDetector.instance().isAlive(endpoints[j]))
 			{
 				if (logger_.isDebugEnabled())
-				  logger_.debug("EndPoint " + endpoints[j] + " is alive so get data from it.");
+				  logger_.debug("InetAddress " + endpoints[j] + " is alive so get data from it.");
 				return endpoints[j];
 			}
 		}
@@ -897,7 +832,7 @@
         throw new UnavailableException(); // no nodes that could contain key are alive
 	}
 
-	Map<Token, EndPoint> getLiveEndPointMap()
+	Map<Token, InetAddress> getLiveEndPointMap()
 	{
 	    return tokenMetadata_.cloneTokenEndPointMap();
 	}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Wed Oct 21 18:26:02 2009
@@ -25,7 +25,7 @@
 import java.util.Set;
 
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 
 
 public interface StorageServiceMBean
@@ -85,17 +85,6 @@
     public void forceTableCleanup() throws IOException;
 
     /**
-     * Stream the files in the bootstrap directory over to the
-     * node being bootstrapped. This is used in case of normal
-     * bootstrap failure. Use a tool to re-calculate the cardinality
-     * at a later point at the destination.
-     * @param directories colon separated list of directories from where 
-     *                files need to be picked up.
-     * @param target endpoint receiving data.
-    */
-    public void forceHandoff(List<String> directories, String target) throws IOException;
-
-    /**
      * Takes the snapshot for a given table.
      * 
      * @param tableName the name of the table.

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StreamManager.java Wed Oct 21 18:26:02 2009
@@ -22,14 +22,14 @@
 import java.io.IOException;
 import java.util.*;
 
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import java.net.InetAddress;
+
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.io.StreamContextManager;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.LogUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
 import org.apache.log4j.Logger;
 
 /*
@@ -40,9 +40,9 @@
 {   
     private static Logger logger_ = Logger.getLogger( StreamManager.class );
         
-    private static Map<EndPoint, StreamManager> streamManagers_ = new HashMap<EndPoint, StreamManager>();
+    private static Map<InetAddress, StreamManager> streamManagers_ = new HashMap<InetAddress, StreamManager>();
     
-    public static StreamManager instance(EndPoint to)
+    public static StreamManager instance(InetAddress to)
     {
         StreamManager streamManager = streamManagers_.get(to);
         if ( streamManager == null )
@@ -54,10 +54,10 @@
     }
     
     private List<File> filesToStream_ = new ArrayList<File>();
-    private EndPoint to_;
+    private InetAddress to_;
     private long totalBytesToStream_ = 0L;
     
-    private StreamManager(EndPoint to)
+    private StreamManager(InetAddress to)
     {
         to_ = to;
     }
@@ -80,7 +80,7 @@
             File file = filesToStream_.get(0);
             if (logger_.isDebugEnabled())
               logger_.debug("Streaming file " + file + " ...");
-            MessagingService.instance().stream(file.getAbsolutePath(), 0L, file.length(), StorageService.getLocalStorageEndPoint(), to_);
+            MessagingService.instance().stream(file.getAbsolutePath(), 0L, file.length(), FBUtilities.getLocalAddress(), to_);
         }
     }
     

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/KeyChecker.java Wed Oct 21 18:26:02 2009
@@ -25,7 +25,7 @@
 
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.Table;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.LogUtil;
@@ -40,9 +40,9 @@
      */
     private static boolean checkIfProcessKey(String key)
     {
-        EndPoint[] endPoints = StorageService.instance().getReadStorageEndPoints(key);
-        EndPoint localEndPoint = StorageService.getLocalStorageEndPoint();
-        for(EndPoint endPoint : endPoints)
+        InetAddress[] endPoints = StorageService.instance().getReadStorageEndPoints(key);
+        InetAddress localEndPoint = FBUtilities.getLocalAddress();
+        for(InetAddress endPoint : endPoints)
         {
             if(endPoint.equals(localEndPoint))
                 return true;
@@ -65,7 +65,7 @@
         /* Sleep for proper discovery */
         Thread.sleep(240000);
         /* Create the file for the missing keys */
-        RandomAccessFile raf = new RandomAccessFile( "Missing-" + FBUtilities.getHostAddress() + ".dat", "rw");
+        RandomAccessFile raf = new RandomAccessFile( "Missing-" + FBUtilities.getLocalAddress() + ".dat", "rw");
         
         /* Start reading the file that contains the keys */
         BufferedReader bufReader = new BufferedReader( new InputStreamReader( new FileInputStream(args[0]) ), KeyChecker.bufSize_ );

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/NodeProbe.java Wed Oct 21 18:26:02 2009
@@ -45,7 +45,7 @@
 import org.apache.cassandra.db.CompactionManager;
 import org.apache.cassandra.db.CompactionManagerMBean;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.service.StorageServiceMBean;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/tools/TokenUpdater.java Wed Oct 21 18:26:02 2009
@@ -23,7 +23,9 @@
 
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.SelectorManager;
@@ -61,20 +63,20 @@
             port = Integer.valueOf(ipPortPair[1]);
         }
 
-        EndPoint target = new EndPoint(ipPortPair[0], port);
+        InetSocketAddress target = new InetSocketAddress(ipPortPair[0], port);
 
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
         Token.serializer().serialize(token, dos);
 
         /* Construct the token update message to be sent */
-        Message tokenUpdateMessage = new Message(new EndPoint(FBUtilities.getHostAddress(), port_),
+        Message tokenUpdateMessage = new Message(target.getAddress(),
                                                  "",
                                                  StorageService.tokenVerbHandler_,
                                                  bos.toByteArray());
 
         System.out.println("Sending a token update message to " + target);
-        MessagingService.instance().sendOneWay(tokenUpdateMessage, target);
+        MessagingService.instance().sendOneWay(tokenUpdateMessage, target.getAddress());
         Thread.sleep(TokenUpdater.waitTime_);
         System.out.println("Done sending the update message");
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Wed Oct 21 18:26:02 2009
@@ -23,8 +23,6 @@
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.security.MessageDigest;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
 import java.util.*;
 import java.util.zip.DataFormatException;
 import java.util.zip.Deflater;
@@ -51,23 +49,22 @@
         return result.toArray( new String[0] );
     }
 
-    public static InetAddress getLocalAddress() throws UnknownHostException
+    public static InetAddress getLocalAddress()
     {
-	if ( localInetAddress_ == null )
-		localInetAddress_ = InetAddress.getLocalHost();
+        if (localInetAddress_ == null)
+            try
+            {
+                localInetAddress_ = DatabaseDescriptor.getListenAddress() == null
+                                    ? InetAddress.getLocalHost()
+                                    : DatabaseDescriptor.getListenAddress();
+            }
+            catch (UnknownHostException e)
+            {
+                throw new RuntimeException(e);
+            }
         return localInetAddress_;
     }
 
-    public static String getHostAddress() throws UnknownHostException
-    {
-        InetAddress inetAddr = getLocalAddress();
-        if (DatabaseDescriptor.getListenAddress() != null)
-        {
-            inetAddr = InetAddress.getByName(DatabaseDescriptor.getListenAddress());
-        }
-        return inetAddr.getHostAddress();
-    }
-
     public static byte[] toByteArray(int i)
     {
         byte[] bytes = new byte[4];

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/client/TestRingCache.java Wed Oct 21 18:26:02 2009
@@ -17,10 +17,11 @@
  */
 package org.apache.cassandra.client;
 
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.service.Cassandra;
 import org.apache.cassandra.service.Column;
 import org.apache.cassandra.service.ColumnPath;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
@@ -64,14 +65,14 @@
             String row = "row" + nRows;
             ColumnPath col = new ColumnPath("Standard1", null, "col1".getBytes());
 
-            EndPoint endPoints[] = ringCache.getEndPoint(row);
+            InetAddress endPoints[] = ringCache.getEndPoint(row);
             String hosts="";
             for (int i=0; i<endPoints.length; i++)
                 hosts = hosts + ((i>0) ? "," : "") + endPoints[i];
             System.out.println("hosts with key " + row + " : " + hosts + "; choose " + endPoints[0]);
         
             // now, read the row back directly from the host owning the row locally
-            setup(endPoints[0].getHost(), endPoints[0].getPort());
+            setup(endPoints[0].getHostAddress(), DatabaseDescriptor.getThriftPort());
             thriftClient.insert(table, row, col, "val1".getBytes(), 1, 1);
             Column column=thriftClient.get(table, row, col, 1).column;
             System.out.println("read row " + row + " " + new String(column.name) + ":" + new String(column.value) + ":" + column.timestamp);

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Wed Oct 21 18:26:02 2009
@@ -31,7 +31,7 @@
 
 import static junit.framework.Assert.assertEquals;
 import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.CollatingOrderPreservingPartitioner;
@@ -123,7 +123,7 @@
         Range r = new Range(partitioner.getToken("0"), partitioner.getToken("zzzzzzz"));
         ranges.add(r);
 
-        List<SSTableReader> fileList = store.forceAntiCompaction(ranges, new EndPoint("127.0.0.1", 9150));
+        List<SSTableReader> fileList = store.forceAntiCompaction(ranges, InetAddress.getByName("127.0.0.1"));
         assert fileList.size() >= 1;
     }
 

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootStrapperTest.java Wed Oct 21 18:26:02 2009
@@ -23,26 +23,28 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
 import org.apache.cassandra.service.StorageService;
 import org.junit.Test;
 
 public class BootStrapperTest {
     @Test
-    public void testSourceTargetComputation() 
+    public void testSourceTargetComputation() throws UnknownHostException
     {
         int numOldNodes = 3;
         IPartitioner p = generateOldTokens(numOldNodes);
         
         Token newToken = p.getDefaultToken();
-        EndPoint newEndPoint = new EndPoint("1.2.3.10",100);
+        InetAddress newEndPoint = InetAddress.getByName("1.2.3.10");
  
         /* New token needs to be part of the map for the algorithm
          * to calculate the ranges correctly
          */
         StorageService.instance().updateTokenMetadataUnsafe(newToken, newEndPoint);
 
-        BootStrapper b = new BootStrapper(new EndPoint[]{newEndPoint}, newToken );
+        BootStrapper b = new BootStrapper(new InetAddress[]{newEndPoint}, newToken );
         Map<Range,List<BootstrapSourceTarget>> res = b.getRangesWithSourceTarget();
         
         int transferCount = 0;
@@ -55,23 +57,23 @@
         }
         /* Only 1 transfer from old node to new node */
         assertEquals(1, transferCount);
-        Map<EndPoint, Map<EndPoint,List<Range>>> temp = LeaveJoinProtocolHelper.getWorkMap(res);
+        Map<InetAddress, Map<InetAddress,List<Range>>> temp = LeaveJoinProtocolHelper.getWorkMap(res);
         assertEquals(1, temp.keySet().size());
         assertEquals(1, temp.entrySet().size());
 
-        Map<EndPoint,Map<EndPoint,List<Range>>> res2 = LeaveJoinProtocolHelper.filterRangesForTargetEndPoint(temp, newEndPoint);
+        Map<InetAddress,Map<InetAddress,List<Range>>> res2 = LeaveJoinProtocolHelper.filterRangesForTargetEndPoint(temp, newEndPoint);
         /* After filtering, still only 1 transfer */
         assertEquals(1, res2.keySet().size());
         assertEquals(1, res2.entrySet().size());
-        assertTrue(((Map<EndPoint,List<Range>>)res2.values().toArray()[0]).containsKey(newEndPoint));
+        assertTrue(((Map<InetAddress,List<Range>>)res2.values().toArray()[0]).containsKey(newEndPoint));
     }
 
-    private IPartitioner generateOldTokens(int numOldNodes)
+    private IPartitioner generateOldTokens(int numOldNodes) throws UnknownHostException
     {
         IPartitioner p = new RandomPartitioner();
         for (int i = 0 ; i< numOldNodes; i++)
         {
-            EndPoint e  = new EndPoint("127.0.0."+i, 100);
+            InetAddress e = InetAddress.getByName("127.0.0." + i);
             Token t = p.getDefaultToken();
             StorageService.instance().updateTokenMetadataUnsafe(t, e);
         }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/dht/BootstrapTest.java Wed Oct 21 18:26:02 2009
@@ -30,7 +30,7 @@
 
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.dht.*;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.apache.cassandra.net.io.StreamContextManager;
 import org.apache.cassandra.io.SSTableReader;
 

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/gms/GossipDigestTest.java Wed Oct 21 18:26:02 2009
@@ -27,7 +27,7 @@
 
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
 import org.junit.Test;
 
 public class GossipDigestTest
@@ -36,7 +36,7 @@
     @Test
     public void test() throws IOException
     {
-        EndPoint endPoint = new EndPoint("127.0.0.1", 3333);
+        InetAddress endPoint = InetAddress.getByName("127.0.0.1");
         int generation = 0;
         int maxVersion = 123;
         GossipDigest expected = new GossipDigest(endPoint, generation, maxVersion);

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/locator/RackUnawareStrategyTest.java Wed Oct 21 18:26:02 2009
@@ -31,12 +31,13 @@
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.dht.OrderPreservingPartitioner;
 import org.apache.cassandra.dht.StringToken;
-import org.apache.cassandra.net.EndPoint;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 
 public class RackUnawareStrategyTest
 {
     @Test
-    public void testBigIntegerStorageEndPoints()
+    public void testBigIntegerStorageEndPoints() throws UnknownHostException
     {
         TokenMetadata tmd = new TokenMetadata();
         IPartitioner partitioner = new RandomPartitioner();
@@ -52,7 +53,7 @@
     }
 
     @Test
-    public void testStringStorageEndPoints()
+    public void testStringStorageEndPoints() throws UnknownHostException
     {
         TokenMetadata tmd = new TokenMetadata();
         IPartitioner partitioner = new OrderPreservingPartitioner();
@@ -69,19 +70,19 @@
 
     // given a list of endpoint tokens, and a set of key tokens falling between the endpoint tokens,
     // make sure that the Strategy picks the right endpoints for the keys.
-    private void testGetStorageEndPoints(TokenMetadata tmd, AbstractReplicationStrategy strategy, Token[] endPointTokens, Token[] keyTokens)
+    private void testGetStorageEndPoints(TokenMetadata tmd, AbstractReplicationStrategy strategy, Token[] endPointTokens, Token[] keyTokens) throws UnknownHostException
     {
-        List<EndPoint> hosts = new ArrayList<EndPoint>();
+        List<InetAddress> hosts = new ArrayList<InetAddress>();
         for (int i = 0; i < endPointTokens.length; i++)
         {
-            EndPoint ep = new EndPoint("127.0.0." + String.valueOf(i + 1), 7001);
+            InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
             tmd.update(endPointTokens[i], ep);
             hosts.add(ep);
         }
 
         for (int i = 0; i < keyTokens.length; i++)
         {
-            EndPoint[] endPoints = strategy.getReadStorageEndPoints(keyTokens[i]);
+            InetAddress[] endPoints = strategy.getReadStorageEndPoints(keyTokens[i]);
             assertEquals(3, endPoints.length);
             for (int j = 0; j < endPoints.length; j++)
             {
@@ -91,7 +92,7 @@
     }
     
     @Test
-    public void testGetStorageEndPointsDuringBootstrap()
+    public void testGetStorageEndPointsDuringBootstrap() throws UnknownHostException
     {
         TokenMetadata tmd = new TokenMetadata();
         IPartitioner partitioner = new RandomPartitioner();
@@ -106,24 +107,24 @@
             keyTokens[i] = new BigIntegerToken(String.valueOf(10 * i + 5));
         }
         
-        List<EndPoint> hosts = new ArrayList<EndPoint>();
+        List<InetAddress> hosts = new ArrayList<InetAddress>();
         for (int i = 0; i < endPointTokens.length; i++)
         {
-            EndPoint ep = new EndPoint("127.0.0." + String.valueOf(i + 1), 7001);
+            InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
             tmd.update(endPointTokens[i], ep);
             hosts.add(ep);
         }
         
         //Add bootstrap node id=6
         Token bsToken = new BigIntegerToken(String.valueOf(25));
-        EndPoint bootstrapEndPoint = new EndPoint("127.0.0.6", 7001);
+        InetAddress bootstrapEndPoint = InetAddress.getByName("127.0.0.6");
         tmd.update(bsToken, bootstrapEndPoint, true);
         
         for (int i = 0; i < keyTokens.length; i++)
         {
-            EndPoint[] endPoints = strategy.getWriteStorageEndPoints(keyTokens[i], strategy.getReadStorageEndPoints(keyTokens[i]));
+            InetAddress[] endPoints = strategy.getWriteStorageEndPoints(keyTokens[i], strategy.getReadStorageEndPoints(keyTokens[i]));
             assertTrue(endPoints.length >=3);
-            List<EndPoint> endPointsList = Arrays.asList(endPoints);
+            List<InetAddress> endPointsList = Arrays.asList(endPoints);
 
             for (int j = 0; j < 3; j++)
             {

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/net/CompactEndPointSerializationHelperTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/net/CompactEndPointSerializationHelperTest.java?rev=828130&r1=828129&r2=828130&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/net/CompactEndPointSerializationHelperTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/net/CompactEndPointSerializationHelperTest.java Wed Oct 21 18:26:02 2009
@@ -1,6 +1,7 @@
 package org.apache.cassandra.net;
 
 import java.net.UnknownHostException;
+import java.net.InetAddress;
 
 import org.junit.Test;
 
@@ -11,10 +12,10 @@
     @Test
     public void testSerialize() throws UnknownHostException
     {
-        EndPoint ep = new EndPoint(FBUtilities.getHostAddress(), 7000);
+        InetAddress ep = FBUtilities.getLocalAddress();
         byte[] bytes = ep.getAddress();
         System.out.println(bytes.length);
-        EndPoint ep2 = EndPoint.getByAddress(bytes);
+        InetAddress ep2 = InetAddress.getByAddress(bytes);
         System.out.println(ep2);
     }
 }



Mime
View raw message