cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r758999 [1/2] - in /incubator/cassandra/trunk: src/org/apache/cassandra/db/ src/org/apache/cassandra/dht/ src/org/apache/cassandra/io/ src/org/apache/cassandra/locator/ src/org/apache/cassandra/service/ src/org/apache/cassandra/tools/ src/o...
Date Fri, 27 Mar 2009 02:44:45 GMT
Author: jbellis
Date: Fri Mar 27 02:44:44 2009
New Revision: 758999

URL: http://svn.apache.org/viewvc?rev=758999&view=rev
Log:
migrate from BigInteger to abstract Token, with BigIntegerToken and StringToken subclasses controlled by Random and OrderPreserving partitioners, respectively

Added:
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/BigIntegerToken.java
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/StringToken.java
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/Token.java
Removed:
    incubator/cassandra/trunk/src/org/apache/cassandra/service/IPartitioner.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/OrderPreservingHashPartitioner.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/RandomPartitioner.java
Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/IPartitioner.java
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/OrderPreservingPartitioner.java
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/RandomPartitioner.java
    incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java
    incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java
    incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
    incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java
    incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java
    incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java
    incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java
    incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageServiceMBean.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/TokenUpdateVerbHandler.java
    incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java
    incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdateVerbHandler.java
    incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdater.java
    incubator/cassandra/trunk/src/org/apache/cassandra/utils/FastObjectHash.java
    incubator/cassandra/trunk/test/org/apache/cassandra/db/SystemTableTest.java
    incubator/cassandra/trunk/test/org/apache/cassandra/dht/RangeTest.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java Fri Mar 27 02:44:44 2009
@@ -1118,7 +1118,7 @@
 	                            continue;
 	                    	}
 	                    }
-	                    if ( Range.isKeyInRanges(ranges, lastkey) )
+	                    if ( Range.isKeyInRanges(lastkey, ranges) )
 	                    {
 	                        if(ssTableRange == null )
 	                        {
@@ -1148,7 +1148,7 @@
 	                    			continue;
 	                    		}
 	                    		/* keep on looping until we find a key in the range */
-	                            while ( !Range.isKeyInRanges(ranges, filestruct.key_ ) )
+	                            while ( !Range.isKeyInRanges(filestruct.key_, ranges) )
 	                            {
 		                    		filestruct = getNextKey	( filestruct );
 		                    		if(filestruct == null)
@@ -1156,7 +1156,7 @@
 		                    			break;
 		                    		}
 	        	                    /* check if we need to continue , if we are done with ranges empty the queue and close all file handles and exit */
-	        	                    //if( !isLoop && StorageService.hash(filestruct.key).compareTo(maxRange.right()) > 0 && !filestruct.key.equals(""))
+	        	                    //if( !isLoop && StorageService.token(filestruct.key).compareTo(maxRange.right()) > 0 && !filestruct.key.equals(""))
 	        	                    //{
 	                                    //filestruct.reader.close();
 	                                    //filestruct = null;

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/DBManager.java Fri Mar 27 02:44:44 2009
@@ -19,20 +19,17 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
-import java.math.BigInteger;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.BasicUtilities;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.GuidGenerator;
 
 
 /**
@@ -64,23 +61,23 @@
 
     public static class StorageMetadata
     {
-        private BigInteger storageId_;
+        private Token myToken;
         private int generation_;
 
-        StorageMetadata(BigInteger storageId, int generation)
+        StorageMetadata(Token storageId, int generation)
         {
-            storageId_ = storageId;
+            myToken = storageId;
             generation_ = generation;
         }
 
-        public BigInteger getStorageId()
+        public Token getStorageId()
         {
-            return storageId_;
+            return myToken;
         }
 
-        public void setStorageId(BigInteger storageId)
+        public void setStorageId(Token storageId)
         {
-            storageId_ = storageId;
+            myToken = storageId;
         }
 
         public int getGeneration()
@@ -117,22 +114,17 @@
         SystemTable sysTable = SystemTable.openSystemTable(SystemTable.name_);
         Row row = sysTable.get(FBUtilities.getHostName());
 
-        Random random = new Random();
+        IPartitioner p = StorageService.getPartitioner();
         if ( row == null )
         {
-        	/* Generate a token for this Storage node */                       
-            String guid = GuidGenerator.guid();
-            BigInteger token = StorageService.hash(guid);
-            if ( token.signum() == -1 )
-                token = token.multiply(BigInteger.valueOf(-1L));
-
+            Token token = p.getDefaultToken();
             int generation = 1;
 
             String key = FBUtilities.getHostName();
             row = new Row(key);
             ColumnFamily cf = new ColumnFamily(SystemTable.cfName_);
-            cf.addColumn(new Column(SystemTable.token_, token.toByteArray()));
-            cf.addColumn(new Column(SystemTable.generation_, BasicUtilities.intToByteArray(generation)));
+            cf.addColumn(new Column(SystemTable.token_, p.getTokenFactory().toByteArray(token)));
+            cf.addColumn(new Column(SystemTable.generation_, BasicUtilities.intToByteArray(generation)) );
             row.addColumnFamily(cf);
             sysTable.apply(row);
             storageMetadata = new StorageMetadata( token, generation);
@@ -147,14 +139,15 @@
             {
             	ColumnFamily columnFamily = columnFamilies.get(cfName);
 
-                IColumn token = columnFamily.getColumn(SystemTable.token_);
-                BigInteger bi = new BigInteger( token.value() );
+                IColumn tokenColumn = columnFamily.getColumn(SystemTable.token_);
+                Token token = p.getTokenFactory().fromByteArray(tokenColumn.value());
 
                 IColumn generation = columnFamily.getColumn(SystemTable.generation_);
                 int gen = BasicUtilities.byteArrayToInt(generation.value()) + 1;
 
-                columnFamily.addColumn(new Column("Generation", BasicUtilities.intToByteArray(gen), generation.timestamp() + 1));
-                storageMetadata = new StorageMetadata( bi, gen );
+                Column generation2 = new Column("Generation", BasicUtilities.intToByteArray(gen), generation.timestamp() + 1);
+                columnFamily.addColumn(generation2);
+                storageMetadata = new StorageMetadata(token, gen);
                 break;
             }
             sysTable.reset(row);

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/FileStruct.java Fri Mar 27 02:44:44 2009
@@ -19,15 +19,12 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
-import java.math.BigInteger;
 
-import org.apache.cassandra.continuations.Suspendable;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
 import org.apache.cassandra.io.IFileReader;
 import org.apache.cassandra.io.SSTable;
 import org.apache.cassandra.io.SequenceFile;
-import org.apache.cassandra.service.PartitionerType;
 import org.apache.cassandra.service.StorageService;
 
 
@@ -101,7 +98,7 @@
 
     public int compareTo(FileStruct f)
     {
-        return -StorageService.getPartitioner().getReverseDecoratedKeyComparator().compare(key_, f.key_);
+        return StorageService.getPartitioner().getDecoratedKeyComparator().compare(key_, f.key_);
     }
     
     public void close() throws IOException

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/SystemTable.java Fri Mar 27 02:44:44 2009
@@ -19,10 +19,11 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
-import java.math.BigInteger;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.log4j.Logger;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.io.DataOutputBuffer;
@@ -31,7 +32,8 @@
 import org.apache.cassandra.io.SequenceFile;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.IPartitioner;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -148,8 +150,9 @@
      * This method is used to update the SystemTable with the
      * new token.
     */
-    public void updateToken(BigInteger token) throws IOException
+    public void updateToken(Token token) throws IOException
     {
+        IPartitioner p = StorageService.getPartitioner();
         if ( systemRow_ != null )
         {
             Map<String, ColumnFamily> columnFamilies = systemRow_.getColumnFamilyMap();
@@ -157,9 +160,9 @@
             ColumnFamily columnFamily = columnFamilies.get(SystemTable.cfName_);
             long oldTokenColumnTimestamp = columnFamily.getColumn(SystemTable.token_).timestamp();
             /* create the "Token" whose value is the new token. */
-            IColumn tokenColumn = new Column(SystemTable.token_, token.toByteArray(), oldTokenColumnTimestamp + 1);
+            IColumn tokenColumn = new Column(SystemTable.token_, p.getTokenFactory().toByteArray(token), oldTokenColumnTimestamp + 1);
             /* replace the old "Token" column with this new one. */
-            logger_.debug("Replacing old token " + new BigInteger( columnFamily.getColumn(SystemTable.token_).value() ).toString() + " with token " + token.toString());
+            logger_.debug("Replacing old token " + p.getTokenFactory().fromByteArray(columnFamily.getColumn(SystemTable.token_).value()) + " with " + token);
             columnFamily.addColumn(tokenColumn);
             reset(systemRow_);
         }
@@ -180,17 +183,7 @@
     {
         LogUtil.init();
         StorageService.instance().start();
-        SystemTable.openSystemTable(SystemTable.cfName_).updateToken( StorageService.hash("503545744:0") );
+        SystemTable.openSystemTable(SystemTable.cfName_).updateToken(StorageService.token("503545744:0"));
         System.out.println("Done");
-
-        /*
-        BigInteger hash = StorageService.hash("304700067:0");
-        List<Range> ranges = new ArrayList<Range>();
-        ranges.add( new Range(new BigInteger("1218069462158869448693347920504606362273788442553"), new BigInteger("1092770595533781724218060956188429069")) );
-        if ( Range.isKeyInRanges(ranges, "304700067:0") )
-        {
-            System.out.println("Done");
-        }
-        */
     }
 }

Added: incubator/cassandra/trunk/src/org/apache/cassandra/dht/BigIntegerToken.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/BigIntegerToken.java?rev=758999&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/BigIntegerToken.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/BigIntegerToken.java Fri Mar 27 02:44:44 2009
@@ -0,0 +1,16 @@
+package org.apache.cassandra.dht;
+
+import java.math.BigInteger;
+
+public class BigIntegerToken extends Token<BigInteger>
+{
+    public BigIntegerToken(BigInteger token)
+    {
+        super(token);
+    }
+
+    // convenience method for testing
+    public BigIntegerToken(String token) {
+        this(new BigInteger(token));
+    }
+}

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/BootStrapper.java Fri Mar 27 02:44:44 2009
@@ -18,24 +18,20 @@
 
 package org.apache.cassandra.dht;
 
-import java.io.IOException;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+
+ import org.apache.log4j.Logger;
+
+ import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.net.EndPoint;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.LogUtil;
 
 
 /**
@@ -48,18 +44,18 @@
     /* endpoints that need to be bootstrapped */
     protected EndPoint[] targets_ = new EndPoint[0];
     /* tokens of the nodes being bootstapped. */
-    protected BigInteger[] tokens_ = new BigInteger[0];
+    protected final Token[] tokens_;
     protected TokenMetadata tokenMetadata_ = null;
     private List<EndPoint> filters_ = new ArrayList<EndPoint>();
 
-    public BootStrapper(EndPoint[] target, BigInteger[] token)
+    public BootStrapper(EndPoint[] target, Token... token)
     {
         targets_ = target;
         tokens_ = token;
         tokenMetadata_ = StorageService.instance().getTokenMetadata();
     }
     
-    public BootStrapper(EndPoint[] target, BigInteger[] token, EndPoint[] filters)
+    public BootStrapper(EndPoint[] target, Token[] token, EndPoint[] filters)
     {
         this(target, token);
         Collections.addAll(filters_, filters);
@@ -71,14 +67,14 @@
         {
             logger_.debug("Beginning bootstrap process for " + targets_ + " ...");                                                               
             /* copy the token to endpoint map */
-            Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+            Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
             /* remove the tokens associated with the endpoints being bootstrapped */                
-            for ( BigInteger token : tokens_ )
+            for (Token token : tokens_)
             {
                 tokenToEndPointMap.remove(token);                    
             }
 
-            Set<BigInteger> oldTokens = new HashSet<BigInteger>( tokenToEndPointMap.keySet() );
+            Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet() );
             Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
             logger_.debug("Total number of old ranges " + oldRanges.length);
             /* 

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/IPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/IPartitioner.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/IPartitioner.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/IPartitioner.java Fri Mar 27 02:44:44 2009
@@ -28,6 +28,8 @@
 
     public String undecorateKey(String decoratedKey);
 
+    public Comparator<String> getDecoratedKeyComparator();
+
     public Comparator<String> getReverseDecoratedKeyComparator();
 
     public Token getTokenForKey(String key);

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolHelper.java Fri Mar 27 02:44:44 2009
@@ -18,19 +18,19 @@
 
 package org.apache.cassandra.dht;
 
-import java.io.IOException;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.log4j.Logger;
+ import java.io.IOException;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+
+ import org.apache.log4j.Logger;
+
+ import org.apache.cassandra.net.EndPoint;
+ import org.apache.cassandra.net.Message;
+ import org.apache.cassandra.net.MessagingService;
 
 
 class LeaveJoinProtocolHelper
@@ -42,20 +42,20 @@
      * a-----x-----y-----b then we want a mapping from 
      * (a, b] --> (a, x], (x, y], (y, b] 
     */
-    protected static Map<Range, List<Range>> getRangeSplitRangeMapping(Range[] oldRanges, BigInteger[] allTokens)
+    protected static Map<Range, List<Range>> getRangeSplitRangeMapping(Range[] oldRanges, Token[] allTokens)
     {
         Map<Range, List<Range>> splitRanges = new HashMap<Range, List<Range>>();
-        BigInteger[] tokens = new BigInteger[allTokens.length];
+        Token[] tokens = new Token[allTokens.length];
         System.arraycopy(allTokens, 0, tokens, 0, tokens.length);
         Arrays.sort(tokens);
         
         Range prevRange = null;
-        BigInteger prevToken = null;
+        Token prevToken = null;
         boolean bVal = false;
         
         for ( Range oldRange : oldRanges )
         {
-            if ( bVal && prevRange != null )
+            if (bVal)
             {
                 bVal = false; 
                 List<Range> subRanges = splitRanges.get(prevRange);
@@ -65,7 +65,7 @@
             
             prevRange = oldRange;
             prevToken = oldRange.left();                
-            for ( BigInteger token : tokens )
+            for (Token token : tokens)
             {     
                 List<Range> subRanges = splitRanges.get(oldRange);
                 if ( oldRange.contains(token) )

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/LeaveJoinProtocolImpl.java Fri Mar 27 02:44:44 2009
@@ -18,21 +18,20 @@
 
 package org.apache.cassandra.dht;
 
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
 
-import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
+ import org.apache.log4j.Logger;
+
+ import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.net.EndPoint;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.LogUtil;
 
 
 /**
@@ -48,11 +47,11 @@
     /* endpoints that are to be moved. */
     protected EndPoint[] targets_ = new EndPoint[0];
     /* position where they need to be moved */
-    protected BigInteger[] tokens_ = new BigInteger[0];
+    protected final Token[] tokens_;
     /* token metadata information */
     protected TokenMetadata tokenMetadata_ = null;
 
-    public LeaveJoinProtocolImpl(EndPoint[] targets, BigInteger[] tokens)
+    public LeaveJoinProtocolImpl(EndPoint[] targets, Token[] tokens)
     {
         targets_ = targets;
         tokens_ = tokens;
@@ -65,24 +64,24 @@
         {
             logger_.debug("Beginning leave/join process for ...");                                                               
             /* copy the token to endpoint map */
-            Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+            Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
             /* copy the endpoint to token map */
-            Map<EndPoint, BigInteger> endpointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
+            Map<EndPoint, Token> endpointToTokenMap = tokenMetadata_.cloneEndPointTokenMap();
             
-            Set<BigInteger> oldTokens = new HashSet<BigInteger>( tokenToEndPointMap.keySet() );
+            Set<Token> oldTokens = new HashSet<Token>( tokenToEndPointMap.keySet() );
             Range[] oldRanges = StorageService.instance().getAllRanges(oldTokens);
             logger_.debug("Total number of old ranges " + oldRanges.length);
             /* Calculate the list of nodes that handle the old ranges */
             Map<Range, List<EndPoint>> oldRangeToEndPointMap = StorageService.instance().constructRangeToEndPointMap(oldRanges);
             
             /* Remove the tokens of the nodes leaving the ring */
-            Set<BigInteger> tokens = getTokensForLeavingNodes();
+            Set<Token> tokens = getTokensForLeavingNodes();
             oldTokens.removeAll(tokens);
             Range[] rangesAfterNodesLeave = StorageService.instance().getAllRanges(oldTokens);
             /* Get expanded range to initial range mapping */
             Map<Range, List<Range>> expandedRangeToOldRangeMap = getExpandedRangeToOldRangeMapping(oldRanges, rangesAfterNodesLeave);
             /* add the new token positions to the old tokens set */
-            for ( BigInteger token : tokens_ )
+            for (Token token : tokens_)
                 oldTokens.add(token);
             Range[] rangesAfterNodesJoin = StorageService.instance().getAllRanges(oldTokens);
             /* replace the ranges that were split with the split ranges in the old configuration */
@@ -196,12 +195,12 @@
         }        
     }
     
-    private Set<BigInteger> getTokensForLeavingNodes()
+    private Set<Token> getTokensForLeavingNodes()
     {
-        Set<BigInteger> tokens = new HashSet<BigInteger>();
+        Set<Token> tokens = new HashSet<Token>();
         for ( EndPoint target : targets_ )
         {
-            tokens.add( tokenMetadata_.getToken(target) );
+            tokens.add(tokenMetadata_.getToken(target));
         }        
         return tokens;
     }
@@ -276,16 +275,16 @@
     public static void main(String[] args) throws Throwable
     {
         StorageService ss = StorageService.instance();
-        ss.updateTokenMetadata(BigInteger.valueOf(3), new EndPoint("A", 7000));
-        ss.updateTokenMetadata(BigInteger.valueOf(6), new EndPoint("B", 7000));
-        ss.updateTokenMetadata(BigInteger.valueOf(9), new EndPoint("C", 7000));
-        ss.updateTokenMetadata(BigInteger.valueOf(12), new EndPoint("D", 7000));
-        ss.updateTokenMetadata(BigInteger.valueOf(15), new EndPoint("E", 7000));
-        ss.updateTokenMetadata(BigInteger.valueOf(18), new EndPoint("F", 7000));
-        ss.updateTokenMetadata(BigInteger.valueOf(21), new EndPoint("G", 7000));
-        ss.updateTokenMetadata(BigInteger.valueOf(24), new EndPoint("H", 7000)); 
+        ss.updateTokenMetadata(new BigIntegerToken("3"), new EndPoint("A", 7000));
+        ss.updateTokenMetadata(new BigIntegerToken("6"), new EndPoint("B", 7000));
+        ss.updateTokenMetadata(new BigIntegerToken("9"), new EndPoint("C", 7000));
+        ss.updateTokenMetadata(new BigIntegerToken("12"), new EndPoint("D", 7000));
+        ss.updateTokenMetadata(new BigIntegerToken("15"), new EndPoint("E", 7000));
+        ss.updateTokenMetadata(new BigIntegerToken("18"), new EndPoint("F", 7000));
+        ss.updateTokenMetadata(new BigIntegerToken("21"), new EndPoint("G", 7000));
+        ss.updateTokenMetadata(new BigIntegerToken("24"), new EndPoint("H", 7000));
         
-        Runnable runnable = new LeaveJoinProtocolImpl( new EndPoint[]{new EndPoint("C", 7000), new EndPoint("D", 7000)}, new BigInteger[]{BigInteger.valueOf(22), BigInteger.valueOf(23)} );
+        Runnable runnable = new LeaveJoinProtocolImpl( new EndPoint[]{new EndPoint("C", 7000), new EndPoint("D", 7000)}, new Token[]{new BigIntegerToken("22"), new BigIntegerToken("23")} );
         runnable.run();
     }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/OrderPreservingPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/OrderPreservingPartitioner.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/OrderPreservingPartitioner.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/OrderPreservingPartitioner.java Fri Mar 27 02:44:44 2009
@@ -19,15 +19,27 @@
 package org.apache.cassandra.dht;
 
 import java.io.UnsupportedEncodingException;
+import java.text.Collator;
 import java.util.Comparator;
+import java.util.Locale;
 import java.util.Random;
 
 public class OrderPreservingPartitioner implements IPartitioner
 {
+    // TODO make locale configurable.  But don't just leave it up to the OS or you could really screw
+    // people over if they deploy on nodes with different OS locales.
+    static final Collator collator = Collator.getInstance(new Locale("en", "US")); 
+
     private static final Comparator<String> comparator = new Comparator<String>() {
         public int compare(String o1, String o2)
         {
-            return o2.compareTo(o1);
+            return collator.compare(o1, o2);
+        }
+    };
+    private static final Comparator<String> reverseComparator = new Comparator<String>() {
+        public int compare(String o1, String o2)
+        {
+            return -comparator.compare(o1, o2);
         }
     };
 
@@ -41,11 +53,16 @@
         return decoratedKey;
     }
 
-    public Comparator<String> getReverseDecoratedKeyComparator()
+    public Comparator<String> getDecoratedKeyComparator()
     {
         return comparator;
     }
 
+    public Comparator<String> getReverseDecoratedKeyComparator()
+    {
+        return reverseComparator;
+    }
+
     public StringToken getDefaultToken()
     {
         String chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/RandomPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/RandomPartitioner.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/RandomPartitioner.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/RandomPartitioner.java Fri Mar 27 02:44:44 2009
@@ -25,7 +25,6 @@
 import org.apache.cassandra.utils.GuidGenerator;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.dht.BigIntegerToken;
-import org.apache.cassandra.service.StorageService;
 
 /**
  * This class generates a MD5 hash of the key. It uses the standard technique
@@ -41,7 +40,13 @@
         {
             BigInteger i1 = new BigInteger(o1.split(":")[0]);
             BigInteger i2 = new BigInteger(o2.split(":")[0]);
-            return i2.compareTo(i1);
+            return i1.compareTo(i2);
+        }
+    };
+    private static final Comparator<String> reverseComparator = new Comparator<String>() {
+        public int compare(String o1, String o2)
+        {
+           return -comparator.compare(o1, o2);
         }
     };
 
@@ -60,11 +65,16 @@
         return decoratedKey.split(":")[1];
     }
 
-    public Comparator<String> getReverseDecoratedKeyComparator()
+    public Comparator<String> getDecoratedKeyComparator()
     {
         return comparator;
     }
 
+    public Comparator<String> getReverseDecoratedKeyComparator()
+    {
+        return reverseComparator;
+    }
+
     public BigIntegerToken getDefaultToken()
     {
         String guid = GuidGenerator.guid();

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/Range.java Fri Mar 27 02:44:44 2009
@@ -21,17 +21,9 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
-import org.apache.cassandra.gms.GossipDigest;
 import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.io.IFileReader;
-import org.apache.cassandra.io.IFileWriter;
-import org.apache.cassandra.net.CompactEndPointSerializationHelper;
-import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.service.StorageService;
 
 
@@ -51,28 +43,12 @@
     public static ICompactSerializer<Range> serializer()
     {
         return serializer_;
-    }
-    
-    public static boolean isKeyInRanges(List<Range> ranges, String key)
-    {
-        if(ranges == null ) 
-            return false;
-        
-        for ( Range range : ranges)
-        {
-            if(range.contains(StorageService.hash(key)))
-            {
-                return true ;
-            }
-        }
-        return false;
-    }
-        
-    
-    private BigInteger left_;
-    private BigInteger right_;
+    }       
+
+    private Token left_;
+    private Token right_;
     
-    public Range(BigInteger left, BigInteger right)
+    public Range(Token left, Token right)
     {
         left_ = left;
         right_ = right;
@@ -82,7 +58,7 @@
      * Returns the left endpoint of a range.
      * @return left endpoint
      */
-    public BigInteger left()
+    public Token left()
     {
         return left_;
     }
@@ -91,7 +67,7 @@
      * Returns the right endpoint of a range.
      * @return right endpoint
      */
-    public BigInteger right()
+    public Token right()
     {
         return right_;
     }
@@ -102,9 +78,9 @@
      * @param bi point in question
      * @return true if the point contains within the range else false.
      */
-    public boolean contains(BigInteger bi)
+    public boolean contains(Token bi)
     {
-        if ( left_.subtract(right_).signum() > 0 )
+        if ( left_.compareTo(right_) > 0 )
         {
             /* 
              * left is greater than right we are wrapping around.
@@ -114,16 +90,16 @@
              * (2) k < b -- return true
              * (3) b < k < a -- return false
             */
-            if ( bi.subtract(left_).signum() >= 0 )
+            if ( bi.compareTo(left_) >= 0 )
                 return true;
-            else return right_.subtract(bi).signum() > 0;
+            else return right_.compareTo(bi) > 0;
         }
-        else if ( left_.subtract(right_).signum() < 0 )
+        else if ( left_.compareTo(right_) < 0 )
         {
             /*
              * This is the range [a, b) where a < b. 
             */
-            return ( bi.subtract(left_).signum() >= 0 && right_.subtract(bi).signum() >=0 );
+            return ( bi.compareTo(left_) >= 0 && right_.compareTo(bi) >=0 );
         }        
         else
     	{
@@ -136,9 +112,9 @@
      * @param range
      * @return
      */
-    private boolean isWrapAround(Range range)
+    private static boolean isWrapAround(Range range)
     {
-        return range.left_.subtract(range.right_).signum() > 0;
+        return range.left_.compareTo(range.right_) > 0;
     }
     
     public int compareTo(Range rhs)
@@ -156,6 +132,22 @@
         return right_.compareTo(rhs.right_);
     }
     
+
+    public static boolean isKeyInRanges(String key, List<Range> ranges)
+    {
+        assert ranges != null;
+
+        Token token = StorageService.token(key);
+        for (Range range : ranges)
+        {
+            if(range.contains(token))
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
     public boolean equals(Object o)
     {
         if ( !(o instanceof Range) )
@@ -178,15 +170,13 @@
 class RangeSerializer implements ICompactSerializer<Range>
 {
     public void serialize(Range range, DataOutputStream dos) throws IOException
-    {        
-        dos.writeUTF(range.left().toString());
-        dos.writeUTF(range.right().toString());
+    {
+        Token.serializer().serialize(range.left(), dos);
+        Token.serializer().serialize(range.right(), dos);
     }
 
     public Range deserialize(DataInputStream dis) throws IOException
     {
-        BigInteger left = new BigInteger(dis.readUTF());
-        BigInteger right = new BigInteger(dis.readUTF());        
-        return new Range(left, right);
+        return new Range(Token.serializer().deserialize(dis), Token.serializer().deserialize(dis));
     }
 }

Added: incubator/cassandra/trunk/src/org/apache/cassandra/dht/StringToken.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/StringToken.java?rev=758999&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/StringToken.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/StringToken.java Fri Mar 27 02:44:44 2009
@@ -0,0 +1,14 @@
+package org.apache.cassandra.dht;
+
+public class StringToken extends Token<String>
+{
+    protected StringToken(String token)
+    {
+        super(token);
+    }
+
+    public int compareTo(Token<String> o)
+    {
+        return OrderPreservingPartitioner.collator.compare(this.token, o.token);
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/dht/Token.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/dht/Token.java?rev=758999&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/dht/Token.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/dht/Token.java Fri Mar 27 02:44:44 2009
@@ -0,0 +1,77 @@
+package org.apache.cassandra.dht;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.service.StorageService;
+
+public abstract class Token<T extends Comparable> implements Comparable<Token<T>>
+{
+    private static final TokenSerializer serializer = new TokenSerializer();
+    public static TokenSerializer serializer()
+    {
+        return serializer;
+    }
+
+    T token;
+
+    protected Token(T token)
+    {
+        this.token = token;
+    }
+
+    /**
+     * This determines the comparison for node destination purposes.
+     */
+    public int compareTo(Token<T> o)
+    {
+        return token.compareTo(o.token);
+    }
+
+    public String toString()
+    {
+        return "Token(" + token + ")";
+    }
+
+    public boolean equals(Object obj)
+    {
+        if (!(obj instanceof Token)) {
+            return false;
+        }
+        return token.equals(((Token)obj).token);
+    }
+
+    public int hashCode()
+    {
+        return token.hashCode();
+    }
+
+    public static abstract class TokenFactory<T extends Comparable>
+    {
+        public abstract byte[] toByteArray(Token<T> token);
+        public abstract Token<T> fromByteArray(byte[] bytes);
+        public abstract Token<T> fromString(String string);
+    }
+
+    public static class TokenSerializer implements ICompactSerializer<Token>
+    {
+        public void serialize(Token token, DataOutputStream dos) throws IOException
+        {
+            IPartitioner p = StorageService.getPartitioner();
+            byte[] b = p.getTokenFactory().toByteArray(token);
+            dos.writeInt(b.length);
+            dos.write(b);
+        }
+
+        public Token deserialize(DataInputStream dis) throws IOException
+        {
+            IPartitioner p = StorageService.getPartitioner();
+            int size = dis.readInt();
+            byte[] bytes = new byte[size];
+            dis.readFully(bytes);
+            return p.getTokenFactory().fromByteArray(bytes);
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/SSTable.java Fri Mar 27 02:44:44 2009
@@ -18,20 +18,30 @@
 
 package org.apache.cassandra.io;
 
-import java.io.*;
-import java.util.*;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Hashtable;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.log4j.Logger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.IPartitioner;
-import org.apache.cassandra.service.PartitionerType;
 import org.apache.cassandra.utils.BasicUtilities;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.db.RowMutation;
-
-import org.apache.log4j.Logger;
+import org.apache.cassandra.dht.IPartitioner;
 
 /**
  * This class is built on top of the SequenceFile. It stores
@@ -162,7 +172,7 @@
         public int compareTo(KeyPositionInfo kPosInfo)
         {
             IPartitioner p = StorageService.getPartitioner();
-            return -p.getReverseDecoratedKeyComparator().compare(decoratedKey, kPosInfo.decoratedKey);
+            return p.getDecoratedKeyComparator().compare(decoratedKey, kPosInfo.decoratedKey);
         }
 
         public String toString()

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/io/SequenceFile.java Fri Mar 27 02:44:44 2009
@@ -658,33 +658,6 @@
         }
         
         /**
-         * This is useful in figuring out the key in system. If an OPHF 
-         * is used then the "key" is the application supplied key. If a random
-         * partitioning mechanism is used then the key is of the form 
-         * hash:key where hash is used internally as the key.
-         * 
-         * @param in the DataInput stream from which the key needs to be read
-         * @return the appropriate key based on partitioning type
-         * @throws IOException
-         */
-        protected String readKeyFromDisk(DataInput in) throws IOException
-        {
-            String keyInDisk = null;
-            PartitionerType pType = StorageService.getPartitionerType();
-            switch( pType )
-            {
-                case OPHF:
-                    keyInDisk = in.readUTF();                  
-                    break;
-                    
-                default:
-                    keyInDisk = in.readUTF().split(":")[0];
-                    break;
-            }
-            return keyInDisk;
-        }
-
-        /**
          * This method dumps the next key/value into the DataOuputStream
          * passed in. Always use this method to query for application
          * specific data as it will have indexes.

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/AbstractStrategy.java Fri Mar 27 02:44:44 2009
@@ -1,6 +1,5 @@
 package org.apache.cassandra.locator;
 
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -8,11 +7,12 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.log4j.Logger;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.service.StorageService;
-import org.apache.log4j.Logger;
 
 /**
  * This class contains a helper method that will be used by
@@ -45,10 +45,10 @@
     protected EndPoint getNextAvailableEndPoint(EndPoint startPoint, List<EndPoint> topN, List<EndPoint> liveNodes)
     {
         EndPoint endPoint = null;
-        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        List tokens = new ArrayList(tokenToEndPointMap.keySet());
         Collections.sort(tokens);
-        BigInteger token = tokenMetadata_.getToken(startPoint);
+        Token token = tokenMetadata_.getToken(startPoint);
         int index = Collections.binarySearch(tokens, token);
         if(index < 0)
         {
@@ -76,7 +76,7 @@
      * endpoint which is in the top N.
      * Get the map of top N to the live nodes currently.
      */
-    public Map<EndPoint, EndPoint> getHintedStorageEndPoints(BigInteger token)
+    public Map<EndPoint, EndPoint> getHintedStorageEndPoints(Token token)
     {
         List<EndPoint> liveList = new ArrayList<EndPoint>();
         Map<EndPoint, EndPoint> map = new HashMap<EndPoint, EndPoint>();
@@ -107,6 +107,6 @@
         return map;
     }
 
-    public abstract EndPoint[] getStorageEndPoints(BigInteger token);
+    public abstract EndPoint[] getStorageEndPoints(Token token);
 
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/IReplicaPlacementStrategy.java Fri Mar 27 02:44:44 2009
@@ -18,9 +18,9 @@
 
 package org.apache.cassandra.locator;
 
-import java.math.BigInteger;
 import java.util.Map;
 
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.EndPoint;
 
 
@@ -32,7 +32,7 @@
  */
 public interface IReplicaPlacementStrategy
 {
-	public EndPoint[] getStorageEndPoints(BigInteger token);
-    public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap);
-    public Map<EndPoint, EndPoint> getHintedStorageEndPoints(BigInteger token);    
+	public EndPoint[] getStorageEndPoints(Token token);
+    public EndPoint[] getStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap);
+    public Map<EndPoint, EndPoint> getHintedStorageEndPoints(Token token);    
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackAwareStrategy.java Fri Mar 27 02:44:44 2009
@@ -1,6 +1,5 @@
 package org.apache.cassandra.locator;
 
-import java.math.BigInteger;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -8,6 +7,7 @@
 import java.util.Map;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.LogUtil;
@@ -27,7 +27,7 @@
         super(tokenMetadata);
     }
     
-    public EndPoint[] getStorageEndPoints(BigInteger token)
+    public EndPoint[] getStorageEndPoints(Token token)
     {
         int startIndex = 0 ;
         List<EndPoint> list = new ArrayList<EndPoint>();
@@ -35,8 +35,8 @@
         boolean bOtherRack = false;
         int foundCount = 0;
         int N = DatabaseDescriptor.getReplicationFactor();
-        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        List tokens = new ArrayList(tokenToEndPointMap.keySet());
         Collections.sort(tokens);
         int index = Collections.binarySearch(tokens, token);
         if(index < 0)
@@ -107,7 +107,7 @@
         return list.toArray(new EndPoint[0]);
     }
 
-    public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap)
+    public EndPoint[] getStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap)
     {
         throw new UnsupportedOperationException("This operation is not currently supported");
     }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/RackUnawareStrategy.java Fri Mar 27 02:44:44 2009
@@ -1,6 +1,5 @@
 package org.apache.cassandra.locator;
 
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -8,6 +7,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.EndPoint;
 
 
@@ -27,18 +27,18 @@
         super(tokenMetadata);
     }
     
-    public EndPoint[] getStorageEndPoints(BigInteger token)
+    public EndPoint[] getStorageEndPoints(Token token)
     {
         return getStorageEndPoints(token, tokenMetadata_.cloneTokenEndPointMap());            
     }
     
-    public EndPoint[] getStorageEndPoints(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap)
+    public EndPoint[] getStorageEndPoints(Token token, Map<Token, EndPoint> tokenToEndPointMap)
     {
         int startIndex = 0 ;
         List<EndPoint> list = new ArrayList<EndPoint>();
         int foundCount = 0;
         int N = DatabaseDescriptor.getReplicationFactor();
-        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+        List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
         Collections.sort(tokens);
         int index = Collections.binarySearch(tokens, token);
         if(index < 0)

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/locator/TokenMetadata.java Fri Mar 27 02:44:44 2009
@@ -18,18 +18,13 @@
 
 package org.apache.cassandra.locator;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.math.BigInteger;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.net.CompactEndPointSerializationHelper;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.EndPoint;
 
 
@@ -40,9 +35,9 @@
 public class TokenMetadata
 {
     /* Maintains token to endpoint map of every node in the cluster. */
-    private Map<BigInteger, EndPoint> tokenToEndPointMap_ = new HashMap<BigInteger, EndPoint>();    
+    private Map<Token, EndPoint> tokenToEndPointMap_ = new HashMap<Token, EndPoint>();
     /* Maintains a reverse index of endpoint to token in the cluster. */
-    private Map<EndPoint, BigInteger> endPointToTokenMap_ = new HashMap<EndPoint, BigInteger>();
+    private Map<EndPoint, Token> endPointToTokenMap_ = new HashMap<EndPoint, Token>();
     
     /* Use this lock for manipulating the token map */
     private final ReadWriteLock lock_ = new ReentrantReadWriteLock(true);
@@ -51,7 +46,7 @@
     {
     }
 
-    private TokenMetadata(Map<BigInteger, EndPoint> tokenToEndPointMap, Map<EndPoint, BigInteger> endPointToTokenMap)
+    private TokenMetadata(Map<Token, EndPoint> tokenToEndPointMap, Map<EndPoint, Token> endPointToTokenMap)
     {
         tokenToEndPointMap_ = tokenToEndPointMap;
         endPointToTokenMap_ = endPointToTokenMap;
@@ -59,20 +54,18 @@
     
     public TokenMetadata cloneMe()
     {
-        Map<BigInteger, EndPoint> tokenToEndPointMap = cloneTokenEndPointMap();
-        Map<EndPoint, BigInteger> endPointToTokenMap = cloneEndPointTokenMap();
-        return new TokenMetadata( tokenToEndPointMap, endPointToTokenMap );
+        return new TokenMetadata(cloneTokenEndPointMap(), cloneEndPointTokenMap());
     }
     
     /**
      * Update the two maps in an safe mode. 
     */
-    public void update(BigInteger token, EndPoint endpoint)
+    public void update(Token token, EndPoint endpoint)
     {
         lock_.writeLock().lock();
         try
         {            
-            BigInteger oldToken = endPointToTokenMap_.get(endpoint);
+            Token oldToken = endPointToTokenMap_.get(endpoint);
             if ( oldToken != null )
                 tokenToEndPointMap_.remove(oldToken);
             tokenToEndPointMap_.put(token, endpoint);
@@ -93,7 +86,7 @@
         lock_.writeLock().lock();
         try
         {            
-            BigInteger oldToken = endPointToTokenMap_.get(endpoint);
+            Token oldToken = endPointToTokenMap_.get(endpoint);
             if ( oldToken != null )
                 tokenToEndPointMap_.remove(oldToken);            
             endPointToTokenMap_.remove(endpoint);
@@ -104,7 +97,7 @@
         }
     }
     
-    public BigInteger getToken(EndPoint endpoint)
+    public Token getToken(EndPoint endpoint)
     {
         lock_.readLock().lock();
         try
@@ -133,12 +126,12 @@
     /*
      * Returns a safe clone of tokenToEndPointMap_.
     */
-    public Map<BigInteger, EndPoint> cloneTokenEndPointMap()
+    public Map<Token, EndPoint> cloneTokenEndPointMap()
     {
         lock_.readLock().lock();
         try
         {            
-            return new HashMap<BigInteger, EndPoint>( tokenToEndPointMap_ );
+            return new HashMap<Token, EndPoint>( tokenToEndPointMap_ );
         }
         finally
         {
@@ -149,12 +142,12 @@
     /*
      * Returns a safe clone of endPointTokenMap_.
     */
-    public Map<EndPoint, BigInteger> cloneEndPointTokenMap()
+    public Map<EndPoint, Token> cloneEndPointTokenMap()
     {
         lock_.readLock().lock();
         try
         {            
-            return new HashMap<EndPoint, BigInteger>( endPointToTokenMap_ );
+            return new HashMap<EndPoint, Token>( endPointToTokenMap_ );
         }
         finally
         {

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java Fri Mar 27 02:44:44 2009
@@ -19,33 +19,30 @@
 package org.apache.cassandra.service;
 
 import java.io.Serializable;
-import java.math.BigInteger;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.log4j.Logger;
+
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.SingleThreadedStage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.concurrent.ThreadFactoryImpl;
-import org.apache.cassandra.dht.LeaveJoinProtocolImpl;
-import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndPointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.IEndPointStateChangeSubscriber;
-import org.apache.cassandra.io.SSTable;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.log4j.Logger;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.utils.*;
 
 /*
  * The load balancing algorithm here is an implementation of
@@ -164,7 +161,6 @@
             if ( isMoveable_.get() )
             {
                 MoveMessage moveMessage = (MoveMessage)message.getMessageBody()[0];
-                BigInteger targetToken = moveMessage.getTargetToken();
                 /* Start the leave operation and join the ring at the position specified */
                 isMoveable_.set(false);
             }
@@ -396,18 +392,18 @@
 
 class MoveMessage implements Serializable
 {
-    private BigInteger targetToken_;
+    private Token targetToken_;
 
     private MoveMessage()
     {
     }
 
-    MoveMessage(BigInteger targetToken)
+    MoveMessage(Token targetToken)
     {
         targetToken_ = targetToken;
     }
 
-    BigInteger getTargetToken()
+    Token getTargetToken()
     {
         return targetToken_;
     }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java Fri Mar 27 02:44:44 2009
@@ -18,19 +18,31 @@
 
 package org.apache.cassandra.service;
 
-import java.util.*;
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.io.*;
-import java.lang.management.ManagementFactory;
-import java.math.BigInteger;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import org.apache.log4j.Logger;
+
 import org.apache.cassandra.analytics.AnalyticsContext;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.MultiThreadedStage;
@@ -55,7 +67,11 @@
 import org.apache.cassandra.dht.BootStrapper;
 import org.apache.cassandra.dht.BootstrapInitiateMessage;
 import org.apache.cassandra.dht.BootstrapMetadataVerbHandler;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.OrderPreservingPartitioner;
+import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndPointState;
 import org.apache.cassandra.gms.FailureDetector;
@@ -77,16 +93,12 @@
 import org.apache.cassandra.tools.TokenUpdateVerbHandler;
 import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.utils.LogUtil;
-import org.apache.commons.math.linear.RealMatrix;
-import org.apache.commons.math.linear.RealMatrixImpl;
-import org.apache.log4j.Logger;
-
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 
 /*
@@ -126,7 +138,7 @@
     public final static String bsMetadataVerbHandler_ = "BS-METADATA-VERB-HANDLER";
     public final static String calloutDeployVerbHandler_ = "CALLOUT-DEPLOY-VERB-HANDLER";
     public final static String touchVerbHandler_ = "TOUCH-VERB-HANDLER";
-    
+
     public static enum ConsistencyLevel
     {
     	WEAK,
@@ -160,9 +172,9 @@
      * function used by the system for
      * partitioning.
     */
-    public static BigInteger hash(String key)
+    public static Token token(String key)
     {
-        return partitioner_.hash(key);
+        return partitioner_.getTokenForKey(key);
     }
 
     public static IPartitioner getPartitioner() {
@@ -424,7 +436,7 @@
         String hashingStrategy = DatabaseDescriptor.getHashingStrategy();
         if (DatabaseDescriptor.ophf_.equalsIgnoreCase(hashingStrategy))
         {
-            partitioner_ = new OrderPreservingHashPartitioner();
+            partitioner_ = new OrderPreservingPartitioner();
         }        
         else
         {
@@ -525,7 +537,7 @@
     }
 
     /* TODO: remove later */
-    public void updateTokenMetadata(BigInteger token, EndPoint endpoint)
+    public void updateTokenMetadata(Token token, EndPoint endpoint)
     {
         tokenMetadata_.update(token, endpoint);
     }
@@ -571,8 +583,8 @@
     public Map<Range, List<EndPoint>> getRangeToEndPointMap()
     {
         /* Get the token to endpoint map. */
-        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-        Set<BigInteger> tokens = tokenToEndPointMap.keySet();
+        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        Set<Token> tokens = tokenToEndPointMap.keySet();
         /* All the ranges for the tokens */
         Range[] ranges = getAllRanges(tokens);
         Map<Range, List<EndPoint>> oldRangeToEndPointMap = constructRangeToEndPointMap(ranges);
@@ -605,7 +617,7 @@
      * @param tokenToEndPointMap mapping of token to endpoints.
      * @return mapping of ranges to the replicas responsible for them.
     */
-    public Map<Range, List<EndPoint>> constructRangeToEndPointMap(Range[] ranges, Map<BigInteger, EndPoint> tokenToEndPointMap)
+    public Map<Range, List<EndPoint>> constructRangeToEndPointMap(Range[] ranges, Map<Token, EndPoint> tokenToEndPointMap)
     {
         logger_.debug("Constructing range to endpoint map ...");
         Map<Range, List<EndPoint>> rangeToEndPointMap = new HashMap<Range, List<EndPoint>>();
@@ -627,7 +639,7 @@
     public Map<EndPoint, List<Range>> constructEndPointToRangesMap()
     {
         Map<EndPoint, List<Range>> endPointToRangesMap = new HashMap<EndPoint, List<Range>>();
-        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
         Collection<EndPoint> mbrs = tokenToEndPointMap.values();
         for ( EndPoint mbr : mbrs )
         {
@@ -648,9 +660,9 @@
         ApplicationState nodeIdState = epState.getApplicationState(StorageService.nodeId_);
         if (nodeIdState != null)
         {
-            BigInteger newToken = new BigInteger(nodeIdState.getState());
+            Token newToken = getPartitioner().getTokenFactory().fromString(nodeIdState.getState());
             logger_.debug("CHANGE IN STATE FOR " + endpoint + " - has token " + nodeIdState.getState());
-            BigInteger oldToken = tokenMetadata_.getToken(ep);
+            Token oldToken = tokenMetadata_.getToken(ep);
 
             if ( oldToken != null )
             {
@@ -732,7 +744,7 @@
      * This method updates the token on disk and modifies the cached
      * StorageMetadata instance. This is only for the local endpoint.
     */
-    public void updateToken(BigInteger token) throws IOException
+    public void updateToken(Token token) throws IOException
     {
         /* update the token on disk */
         SystemTable.openSystemTable(SystemTable.name_).updateToken(token);
@@ -773,12 +785,12 @@
     {
     	if ( keys.length > 0 )
     	{
-            BigInteger token = tokenMetadata_.getToken(StorageService.tcpAddr_);
-	        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-	        BigInteger[] tokens = tokenToEndPointMap.keySet().toArray( new BigInteger[0] );
+            Token token = tokenMetadata_.getToken(StorageService.tcpAddr_);
+	        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+	        Token[] tokens = tokenToEndPointMap.keySet().toArray(new Token[tokenToEndPointMap.keySet().size()]);
 	        Arrays.sort(tokens);
 	        int index = Arrays.binarySearch(tokens, token) * (keys.length/tokens.length);
-	        BigInteger newToken = hash( keys[index] );
+	        Token newToken = token( keys[index] );
 	        /* update the token */
 	        updateToken(newToken);
     	}
@@ -816,7 +828,7 @@
         }        
         String[] allNodes = nodesToLoad.split(":");
         EndPoint[] endpoints = new EndPoint[allNodes.length];
-        BigInteger[] tokens = new BigInteger[allNodes.length];
+        Token[] tokens = new Token[allNodes.length];
         
         for ( int i = 0; i < allNodes.length; ++i )
         {
@@ -852,8 +864,8 @@
         switch ( mode )
         {
             case FULL:
-                BigInteger token = tokenMetadata_.getToken(endpoint);
-                bootStrapper_.submit( new BootStrapper(new EndPoint[]{endpoint}, new BigInteger[]{token}) );
+                Token token = tokenMetadata_.getToken(endpoint);
+                bootStrapper_.submit(new BootStrapper(new EndPoint[]{endpoint}, token));
                 break;
 
             case HINT:
@@ -871,26 +883,14 @@
     public String getToken(EndPoint ep)
     {
         EndPoint ep2 = new EndPoint(ep.getHost(), DatabaseDescriptor.getStoragePort());
-        BigInteger token = tokenMetadata_.getToken(ep2);
-        return ( token == null ) ? BigInteger.ZERO.toString() : token.toString();
+        Token token = tokenMetadata_.getToken(ep2);
+        return ( token == null ) ? "" : token.toString();
     }
 
     public String getToken()
     {
         return tokenMetadata_.getToken(StorageService.tcpAddr_).toString();
     }
-    
-    public void updateToken(String token)
-    {
-        try
-        {
-            updateToken(new BigInteger(token));
-        }
-        catch ( IOException ex )
-        {
-            logger_.debug(LogUtil.throwableToString(ex));
-        }
-    }
 
     public String getLiveNodes()
     {
@@ -973,9 +973,9 @@
      */
     EndPoint getPredecessor(EndPoint ep)
     {
-        BigInteger token = tokenMetadata_.getToken(ep);
-        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+        Token token = tokenMetadata_.getToken(ep);
+        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
         Collections.sort(tokens);
         int index = Collections.binarySearch(tokens, token);
         EndPoint predecessor = (index == 0) ? tokenToEndPointMap.get(tokens
@@ -990,9 +990,9 @@
      */
     public EndPoint getSuccessor(EndPoint ep)
     {
-        BigInteger token = tokenMetadata_.getToken(ep);
-        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+        Token token = tokenMetadata_.getToken(ep);
+        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
         Collections.sort(tokens);
         int index = Collections.binarySearch(tokens, token);
         EndPoint successor = (index == (tokens.size() - 1)) ? tokenToEndPointMap
@@ -1008,9 +1008,9 @@
      */
     public Range getPrimaryRangeForEndPoint(EndPoint ep)
     {
-        BigInteger right = tokenMetadata_.getToken(ep);
+        Token right = tokenMetadata_.getToken(ep);
         EndPoint predecessor = getPredecessor(ep);
-        BigInteger left = tokenMetadata_.getToken(predecessor);
+        Token left = tokenMetadata_.getToken(predecessor);
         return new Range(left, right);
     }
     
@@ -1041,10 +1041,10 @@
      * ranges.
      * @return ranges in sorted order
     */
-    public Range[] getAllRanges(Set<BigInteger> tokens)
+    public Range[] getAllRanges(Set<Token> tokens)
     {
         List<Range> ranges = new ArrayList<Range>();
-        List<BigInteger> allTokens = new ArrayList<BigInteger>(tokens);
+        List<Token> allTokens = new ArrayList<Token>(tokens);
         Collections.sort(allTokens);
         int size = allTokens.size();
         for ( int i = 1; i < size; ++i )
@@ -1067,9 +1067,9 @@
     public EndPoint getPrimary(String key)
     {
         EndPoint endpoint = StorageService.tcpAddr_;
-        BigInteger token = hash(key);
-        Map<BigInteger, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
-        List<BigInteger> tokens = new ArrayList<BigInteger>(tokenToEndPointMap.keySet());
+        Token token = token(key);
+        Map<Token, EndPoint> tokenToEndPointMap = tokenMetadata_.cloneTokenEndPointMap();
+        List tokens = new ArrayList<Token>(tokenToEndPointMap.keySet());
         if (tokens.size() > 0)
         {
             Collections.sort(tokens);
@@ -1115,7 +1115,7 @@
      */
     public EndPoint[] getNStorageEndPoint(String key)
     {
-        BigInteger token = hash(key);
+        Token token = token(key);
         return nodePicker_.getStorageEndPoints(token);
     }
 
@@ -1150,7 +1150,7 @@
      */
     public Map<EndPoint, EndPoint> getNStorageEndPointMap(String key)
     {
-        BigInteger token = hash(key);
+        Token token = token(key);
         return nodePicker_.getHintedStorageEndPoints(token);
     }
 
@@ -1160,7 +1160,7 @@
      *
      * param @ token - position on the ring
      */
-    public EndPoint[] getNStorageEndPoint(BigInteger token)
+    public EndPoint[] getNStorageEndPoint(Token token)
     {
         return nodePicker_.getStorageEndPoints(token);
     }
@@ -1173,7 +1173,7 @@
      * param @ token - position on the ring
      * param @ tokens - w/o the following tokens in the token list
      */
-    protected EndPoint[] getNStorageEndPoint(BigInteger token, Map<BigInteger, EndPoint> tokenToEndPointMap)
+    protected EndPoint[] getNStorageEndPoint(Token token, Map<Token, EndPoint> tokenToEndPointMap)
     {
         return nodePicker_.getStorageEndPoints(token, tokenToEndPointMap);
     }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageServiceMBean.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageServiceMBean.java Fri Mar 27 02:44:44 2009
@@ -44,11 +44,6 @@
     public void loadAll(String nodes);
     
     /**
-     * This method is used only for debug purpose.  
-    */
-    public void updateToken(String token);    
-    
-    /**
      * 
      */
     public void doGC();

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/TokenUpdateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/TokenUpdateVerbHandler.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/TokenUpdateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/TokenUpdateVerbHandler.java Fri Mar 27 02:44:44 2009
@@ -19,12 +19,13 @@
 package org.apache.cassandra.service;
 
 import java.io.IOException;
-import java.math.BigInteger;
 
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -37,7 +38,7 @@
     public void doVerb(Message message)
     {
     	byte[] body = (byte[])message.getMessageBody()[0];
-        BigInteger token = new BigInteger(body);
+        Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(body);
         try
         {
         	logger_.info("Updating the token to [" + token + "]");

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/tools/MembershipCleanerVerbHandler.java Fri Mar 27 02:44:44 2009
@@ -18,25 +18,20 @@
 
 package org.apache.cassandra.tools;
 
-import java.util.*;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.math.BigInteger;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.tools.TokenUpdater.TokenInfoMessage;
 import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-import org.apache.cassandra.io.*;
-import org.apache.cassandra.config.*;
 
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdateVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdateVerbHandler.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdateVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdateVerbHandler.java Fri Mar 27 02:44:44 2009
@@ -21,20 +21,20 @@
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.math.BigInteger;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.tools.TokenUpdater.TokenInfoMessage;
 import org.apache.cassandra.utils.LogUtil;
 
 /**
@@ -54,9 +54,8 @@
             DataInputBuffer bufIn = new DataInputBuffer();
             bufIn.reset(body, body.length);
             /* Deserialize to get the token for this endpoint. */
-            TokenUpdater.TokenInfoMessage tiMessage = TokenUpdater.TokenInfoMessage.serializer().deserialize(bufIn);
-            
-            BigInteger token = tiMessage.getToken();
+            Token token = Token.serializer().deserialize(bufIn);
+
             logger_.info("Updating the token to [" + token + "]");
             StorageService.instance().updateToken(token);
             
@@ -66,19 +65,19 @@
             logger_.debug("Number of nodes in the header " + headers.size());
             Set<String> nodes = headers.keySet();
             
+            IPartitioner p = StorageService.getPartitioner();
             for ( String node : nodes )
             {            
                 logger_.debug("Processing node " + node);
                 byte[] bytes = headers.remove(node);
                 /* Send a message to this node to update its token to the one retreived. */
                 EndPoint target = new EndPoint(node, DatabaseDescriptor.getStoragePort());
-                token = new BigInteger(bytes);
+                token = p.getTokenFactory().fromByteArray(bytes);
                 
-                /* Reset the new TokenInfoMessage */
-                tiMessage = new TokenUpdater.TokenInfoMessage(target, token );
+                /* Reset the new Message */
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 DataOutputStream dos = new DataOutputStream(bos);
-                TokenInfoMessage.serializer().serialize(tiMessage, dos);
+                Token.serializer().serialize(token, dos);
                 message.setMessageBody(new Object[]{bos.toByteArray()});
                 
                 logger_.debug("Sending a token update message to " + target + " to update it to " + token);

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdater.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdater.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdater.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/tools/TokenUpdater.java Fri Mar 27 02:44:44 2009
@@ -20,16 +20,12 @@
 
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FileInputStream;
-import java.io.IOException;
 import java.io.InputStreamReader;
-import java.io.Serializable;
-import java.math.BigInteger;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -50,16 +46,17 @@
         }
         
         String ipPort = args[0];
-        String token = args[1];
+        IPartitioner p = StorageService.getPartitioner();
+        Token token = p.getTokenFactory().fromString(args[1]);
         String file = args[2];
         
         String[] ipPortPair = ipPort.split(":");
         EndPoint target = new EndPoint(ipPortPair[0], Integer.valueOf(ipPortPair[1]));
-        TokenInfoMessage tiMessage = new TokenInfoMessage( target, new BigInteger(token) );
-        
+
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
-        TokenInfoMessage.serializer().serialize(tiMessage, dos);
+        Token.serializer().serialize(token, dos);
+
         /* Construct the token update message to be sent */
         Message tokenUpdateMessage = new Message( new EndPoint(FBUtilities.getHostName(), port_), "", StorageService.tokenVerbHandler_, new Object[]{bos.toByteArray()} );
         
@@ -70,8 +67,8 @@
         {
             String[] nodeTokenPair = line.split(" ");
             /* Add the node and the token pair into the header of this message. */
-            BigInteger nodeToken = new BigInteger(nodeTokenPair[1]);
-            tokenUpdateMessage.addHeader(nodeTokenPair[0], nodeToken.toByteArray());
+            Token nodeToken = p.getTokenFactory().fromString(nodeTokenPair[1]);
+            tokenUpdateMessage.addHeader(nodeTokenPair[0], p.getTokenFactory().toByteArray(nodeToken));
         }
         
         System.out.println("Sending a token update message to " + target);
@@ -79,64 +76,5 @@
         Thread.sleep(TokenUpdater.waitTime_);
         System.out.println("Done sending the update message");
     }
-    
-    public static class TokenInfoMessage implements Serializable
-    {
-        private static ICompactSerializer<TokenInfoMessage> serializer_;
-        private static AtomicInteger idGen_ = new AtomicInteger(0);
-        
-        static
-        {
-            serializer_ = new TokenInfoMessageSerializer();            
-        }
-        
-        static ICompactSerializer<TokenInfoMessage> serializer()
-        {
-            return serializer_;
-        }
 
-        private EndPoint target_;
-        private BigInteger token_;
-        
-        TokenInfoMessage(EndPoint target, BigInteger token)
-        {
-            target_ = target;
-            token_ = token;
-        }
-        
-        EndPoint getTarget()
-        {
-            return target_;
-        }
-        
-        BigInteger getToken()
-        {
-            return token_;
-        }
-    }
-    
-    public static class TokenInfoMessageSerializer implements ICompactSerializer<TokenInfoMessage>
-    {
-        public void serialize(TokenInfoMessage tiMessage, DataOutputStream dos) throws IOException
-        {
-            byte[] node = EndPoint.toBytes( tiMessage.getTarget() );
-            dos.writeInt(node.length);
-            dos.write(node);
-            
-            byte[] token = tiMessage.getToken().toByteArray();
-            dos.writeInt( token.length );
-            dos.write(token);
-        }
-        
-        public TokenInfoMessage deserialize(DataInputStream dis) throws IOException
-        {
-            byte[] target = new byte[dis.readInt()];
-            dis.readFully(target);
-            
-            byte[] token = new byte[dis.readInt()];
-            dis.readFully(token);
-            
-            return new TokenInfoMessage(EndPoint.fromBytes(target), new BigInteger(token));
-        }
-    }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/utils/FastObjectHash.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/utils/FastObjectHash.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/utils/FastObjectHash.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/utils/FastObjectHash.java Fri Mar 27 02:44:44 2009
@@ -187,7 +187,7 @@
         }
         else
         { // already FULL or REMOVED, must probe
-            // compute the double hash
+            // compute the double token
             final int probe = 1 + (hash % (length - 2));
 
             // if the slot we landed on is FULL (but not removed), probe

Modified: incubator/cassandra/trunk/test/org/apache/cassandra/db/SystemTableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/SystemTableTest.java?rev=758999&r1=758998&r2=758999&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/org/apache/cassandra/db/SystemTableTest.java (original)
+++ incubator/cassandra/trunk/test/org/apache/cassandra/db/SystemTableTest.java Fri Mar 27 02:44:44 2009
@@ -9,6 +9,6 @@
 public class SystemTableTest extends ServerTest {
     @Test
     public void testMain() throws IOException {
-        SystemTable.openSystemTable(SystemTable.cfName_).updateToken( StorageService.hash("503545744:0") );
+        SystemTable.openSystemTable(SystemTable.cfName_).updateToken( StorageService.token("503545744:0") );
     }
 }



Mime
View raw message