cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject svn commit: r1208993 [2/4] - in /cassandra/trunk: ./ src/java/org/apache/cassandra/client/ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/cql/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/compaction/ src/java/...
Date Thu, 01 Dec 2011 08:31:43 GMT
Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java?rev=1208993&r1=1208992&r2=1208993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/Range.java Thu Dec  1 08:31:20 2011
@@ -25,6 +25,7 @@ import java.util.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.commons.lang.ObjectUtils;
 
+import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -33,21 +34,21 @@ import org.apache.cassandra.utils.FBUtil
  *
  * A Range is responsible for the tokens between (left, right].
  */
-public class Range extends AbstractBounds implements Comparable<Range>, Serializable
+public class Range<T extends RingPosition> extends AbstractBounds<T> implements Comparable<Range<T>>, Serializable
 {
     public static final long serialVersionUID = 1L;
     
-    public Range(Token left, Token right)
+    public Range(T left, T right)
     {
         this(left, right, StorageService.getPartitioner());
     }
 
-    public Range(Token left, Token right, IPartitioner partitioner)
+    public Range(T left, T right, IPartitioner partitioner)
     {
         super(left, right, partitioner);
     }
 
-    public static boolean contains(Token left, Token right, Token bi)
+    public static <T extends RingPosition> boolean contains(T left, T right, T bi)
     {
         if (isWrapAround(left, right))
         {
@@ -68,11 +69,11 @@ public class Range extends AbstractBound
             /*
              * This is the range (a, b] where a < b. 
              */
-            return ( compare(bi,left) > 0 && compare(right,bi) >= 0);
+            return bi.compareTo(left) > 0 && right.compareTo(bi) >= 0;
         }
     }
 
-    public boolean contains(Range that)
+    public boolean contains(Range<T> that)
     {
         if (this.left.equals(this.right))
         {
@@ -84,13 +85,13 @@ public class Range extends AbstractBound
         boolean thatwraps = isWrapAround(that.left, that.right);
         if (thiswraps == thatwraps)
         {
-            return compare(left,that.left) <= 0 && compare(that.right,right) <= 0;
+            return left.compareTo(that.left) <= 0 && that.right.compareTo(right) <= 0;
         }
         else if (thiswraps)
         {
             // wrapping might contain non-wrapping
             // that is contained if both its tokens are in one of our wrap segments
-            return compare(left,that.left) <= 0 || compare(that.right,right) <= 0;
+            return left.compareTo(that.left) <= 0 || that.right.compareTo(right) <= 0;
         }
         else
         {
@@ -106,7 +107,7 @@ public class Range extends AbstractBound
      * @param bi point in question
      * @return true if the point contains within the range else false.
      */
-    public boolean contains(Token bi)
+    public boolean contains(T bi)
     {
         return contains(left, right, bi);
     }
@@ -115,14 +116,19 @@ public class Range extends AbstractBound
      * @param that range to check for intersection
      * @return true if the given range intersects with this range.
      */
-    public boolean intersects(Range that)
+    public boolean intersects(Range<T> that)
     {
         return intersectionWith(that).size() > 0;
     }
 
-    public static Set<Range> rangeSet(Range ... ranges)
+    public static <T extends RingPosition> Set<Range<T>> rangeSet(Range<T> ... ranges)
     {
-        return Collections.unmodifiableSet(new HashSet<Range>(Arrays.asList(ranges)));
+        return Collections.unmodifiableSet(new HashSet<Range<T>>(Arrays.asList(ranges)));
+    }
+
+    public static <T extends RingPosition> Set<Range<T>> rangeSet(Range<T> range)
+    {
+        return Collections.singleton(range);
     }
 
     /**
@@ -131,7 +137,7 @@ public class Range extends AbstractBound
      * say you have nodes G and M, with query range (D,T]; the intersection is (M-T] and (D-G].
      * If there is no intersection, an empty list is returned.
      */
-    public Set<Range> intersectionWith(Range that)
+    public Set<Range<T>> intersectionWith(Range<T> that)
     {
         if (that.contains(this))
             return rangeSet(this);
@@ -145,9 +151,9 @@ public class Range extends AbstractBound
             // neither wraps.  the straightforward case.
             if (!(left.compareTo(that.right) < 0 && that.left.compareTo(right) < 0))
                 return Collections.emptySet();
-            return rangeSet(new Range((Token)ObjectUtils.max(this.left, that.left),
-                                      (Token)ObjectUtils.min(this.right, that.right),
-                                      partitioner));
+            return rangeSet(new Range<T>((T)ObjectUtils.max(this.left, that.left),
+                                         (T)ObjectUtils.min(this.right, that.right),
+                                         partitioner));
         }
         if (thiswraps && thatwraps)
         {
@@ -171,82 +177,53 @@ public class Range extends AbstractBound
         return intersectionOneWrapping(that, this);
     }
 
-    private static Set<Range> intersectionBothWrapping(Range first, Range that)
+    private static <T extends RingPosition> Set<Range<T>> intersectionBothWrapping(Range<T> first, Range<T> that)
     {
-        Set<Range> intersection = new HashSet<Range>(2);
+        Set<Range<T>> intersection = new HashSet<Range<T>>(2);
         if (that.right.compareTo(first.left) > 0)
-            intersection.add(new Range(first.left, that.right, first.partitioner));
-        intersection.add(new Range(that.left, first.right, first.partitioner));
+            intersection.add(new Range<T>(first.left, that.right, first.partitioner));
+        intersection.add(new Range<T>(that.left, first.right, first.partitioner));
         return Collections.unmodifiableSet(intersection);
     }
 
-    private static Set<Range> intersectionOneWrapping(Range wrapping, Range other)
+    private static <T extends RingPosition> Set<Range<T>> intersectionOneWrapping(Range<T> wrapping, Range<T> other)
     {
-        Set<Range> intersection = new HashSet<Range>(2);
+        Set<Range<T>> intersection = new HashSet<Range<T>>(2);
         if (other.contains(wrapping.right))
-            intersection.add(new Range(other.left, wrapping.right, wrapping.partitioner));
+            intersection.add(new Range<T>(other.left, wrapping.right, wrapping.partitioner));
         // need the extra compareto here because ranges are asymmetrical; wrapping.left _is not_ contained by the wrapping range
         if (other.contains(wrapping.left) && wrapping.left.compareTo(other.right) < 0)
-            intersection.add(new Range(wrapping.left, other.right, wrapping.partitioner));
+            intersection.add(new Range<T>(wrapping.left, other.right, wrapping.partitioner));
         return Collections.unmodifiableSet(intersection);
     }
 
-    public AbstractBounds createFrom(Token token)
+    public AbstractBounds<T> createFrom(T pos)
     {
-        if (token.equals(left))
+        if (pos.equals(left))
             return null;
-        return new Range(left, token, partitioner);
+        return new Range<T>(left, pos, partitioner);
     }
 
-    public List<AbstractBounds> unwrap()
+    public List<? extends AbstractBounds<T>> unwrap()
     {
-        if (!isWrapAround() || right.equals(partitioner.getMinimumToken()))
-            return (List)Arrays.asList(this);
-        List<AbstractBounds> unwrapped = new ArrayList<AbstractBounds>(2);
-        unwrapped.add(new Range(left, partitioner.getMinimumToken(), partitioner));
-        unwrapped.add(new Range(partitioner.getMinimumToken(), right, partitioner));
+        T minValue = (T) partitioner.minValue(right.getClass());
+        if (!isWrapAround() || right.equals(minValue))
+            return Arrays.asList(this);
+        List<AbstractBounds<T>> unwrapped = new ArrayList<AbstractBounds<T>>(2);
+        unwrapped.add(new Range<T>(left, minValue, partitioner));
+        unwrapped.add(new Range<T>(minValue, right, partitioner));
         return unwrapped;
     }
 
     /**
      * Tells if the given range is a wrap around.
      */
-    public static boolean isWrapAround(Token left, Token right)
+    public static <T extends RingPosition> boolean isWrapAround(T left, T right)
     {
-       return compare(left,right) >= 0;           
+       return left.compareTo(right) >= 0;
     }
-    
-    public static int compare(Token left, Token right)
-    {
-        ByteBuffer l,r;
 
-        if (left.token instanceof byte[])
-        {
-            l  = ByteBuffer.wrap((byte[]) left.token);
-        }
-        else if (left.token instanceof ByteBuffer)
-        {
-            l  = (ByteBuffer) left.token;
-        }
-        else
-        {
-            //Handles other token types
-            return left.compareTo(right);
-        }
-
-        if (right.token instanceof byte[])
-        {
-            r  = ByteBuffer.wrap((byte[]) right.token);
-        }
-        else
-        {
-            r  = (ByteBuffer) right.token;
-        }
-
-        return ByteBufferUtil.compareUnsigned(l, r);
-     }
-    
-    public int compareTo(Range rhs)
+    public int compareTo(Range<T> rhs)
     {
         /* 
          * If the range represented by the "this" pointer
@@ -258,7 +235,7 @@ public class Range extends AbstractBound
         if ( isWrapAround(rhs.left, rhs.right) )
             return 1;
         
-        return compare(right,rhs.right);
+        return right.compareTo(rhs.right);
     }
 
     /**
@@ -268,14 +245,14 @@ public class Range extends AbstractBound
      * @return An ArrayList of the Ranges left after subtracting contained
      * from this.
      */
-    private ArrayList<Range> subtractContained(Range contained)
+    private ArrayList<Range<T>> subtractContained(Range<T> contained)
     {
-        ArrayList<Range> difference = new ArrayList<Range>();
+        ArrayList<Range<T>> difference = new ArrayList<Range<T>>();
 
         if (!left.equals(contained.left))
-            difference.add(new Range(left, contained.left, partitioner));
+            difference.add(new Range<T>(left, contained.left, partitioner));
         if (!right.equals(contained.right))
-            difference.add(new Range(contained.right, right, partitioner));
+            difference.add(new Range<T>(contained.right, right, partitioner));
         return difference;
     }
 
@@ -287,13 +264,13 @@ public class Range extends AbstractBound
      * @param rhs range to calculate difference
      * @return set of difference ranges
      */
-    public Set<Range> differenceToFetch(Range rhs)
+    public Set<Range<T>> differenceToFetch(Range<T> rhs)
     {
-        Set<Range> result;
-        Set<Range> intersectionSet = this.intersectionWith(rhs);
+        Set<Range<T>> result;
+        Set<Range<T>> intersectionSet = this.intersectionWith(rhs);
         if (intersectionSet.isEmpty())
         {
-            result = new HashSet<Range>();
+            result = new HashSet<Range<T>>();
             result.add(rhs);
         }
         else
@@ -302,29 +279,29 @@ public class Range extends AbstractBound
             intersectionSet.toArray(intersections);
             if (intersections.length == 1)
             {
-                result = new HashSet<Range>(rhs.subtractContained(intersections[0]));
+                result = new HashSet<Range<T>>(rhs.subtractContained(intersections[0]));
             }
             else
             {
                 // intersections.length must be 2
-                Range first = intersections[0];
-                Range second = intersections[1];
-                ArrayList<Range> temp = rhs.subtractContained(first);
+                Range<T> first = intersections[0];
+                Range<T> second = intersections[1];
+                ArrayList<Range<T>> temp = rhs.subtractContained(first);
 
                 // Because there are two intersections, subtracting only one of them
                 // will yield a single Range.
-                Range single = temp.get(0);
-                result = new HashSet<Range>(single.subtractContained(second));
+                Range<T> single = temp.get(0);
+                result = new HashSet<Range<T>>(single.subtractContained(second));
             }
         }
         return result;
     }
 
-    public static boolean isTokenInRanges(Token token, Iterable<Range> ranges)
+    public static <T extends RingPosition> boolean isInRanges(T token, Iterable<Range<T>> ranges)
     {
         assert ranges != null;
 
-        for (Range range : ranges)
+        for (Range<T> range : ranges)
         {
             if (range.contains(token))
             {
@@ -334,14 +311,16 @@ public class Range extends AbstractBound
         return false;
     }
 
+    @Override
     public boolean equals(Object o)
     {
         if (!(o instanceof Range))
             return false;
-        Range rhs = (Range)o;
-        return compare(left,rhs.left) == 0 && compare(right,rhs.right) == 0;
+        Range<T> rhs = (Range<T>)o;
+        return left.equals(rhs.left) && right.equals(rhs.right);
     }
-    
+
+    @Override
     public String toString()
     {
         return "(" + left + "," + right + "]";
@@ -351,4 +330,22 @@ public class Range extends AbstractBound
     {
         return isWrapAround(left, right);
     }
+
+    /**
+     * Compute a range of keys corresponding to a given range of token.
+     */
+    public static Range<RowPosition> makeRowRange(Token left, Token right, IPartitioner partitioner)
+    {
+        return new Range<RowPosition>(left.maxKeyBound(partitioner), right.maxKeyBound(partitioner), partitioner);
+    }
+
+    public AbstractBounds<RowPosition> toRowBounds()
+    {
+        return (left instanceof Token) ? makeRowRange((Token)left, (Token)right, partitioner) : (Range<RowPosition>)this;
+    }
+
+    public AbstractBounds<Token> toTokenBounds()
+    {
+        return (left instanceof RowPosition) ? new Range<Token>(((RowPosition)left).getToken(), ((RowPosition)right).getToken(), partitioner) : (Range<Token>)this;
+    }
 }

Added: cassandra/trunk/src/java/org/apache/cassandra/dht/RingPosition.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/RingPosition.java?rev=1208993&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/RingPosition.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/RingPosition.java Thu Dec  1 08:31:20 2011
@@ -0,0 +1,39 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.dht;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+/**
+ * Interface representing a position on the ring.
+ * Both Token and DecoratedKey represent a position in the ring, a token being
+ * less precise than a DecoratedKey (a token is really a range of keys).
+ */
+public interface RingPosition<T> extends Comparable<T>
+{
+    public Token getToken();
+    public boolean isMinimum(IPartitioner partitioner);
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/Token.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/Token.java?rev=1208993&r1=1208992&r2=1208993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/Token.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/Token.java Thu Dec  1 08:31:20 2011
@@ -25,11 +25,12 @@ import java.io.Serializable;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.config.ConfigurationException;
+import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-public abstract class Token<T> implements Comparable<Token<T>>, Serializable
+public abstract class Token<T> implements RingPosition<Token<T>>, Serializable
 {
     private static final long serialVersionUID = 1L;
 
@@ -41,6 +42,9 @@ public abstract class Token<T> implement
 
     public final T token;
 
+    private final transient KeyBound minimumBound = new KeyBound(true);
+    private final transient KeyBound maximumBound = new KeyBound(false);
+
     protected Token(T token)
     {
         this.token = token;
@@ -51,19 +55,24 @@ public abstract class Token<T> implement
      */
     abstract public int compareTo(Token<T> o);
 
+    @Override
     public String toString()
     {
         return token.toString();
     }
 
+    @Override
     public boolean equals(Object obj)
     {
-        if (!(obj instanceof Token)) {
+        if (this == obj)
+            return true;
+        if (obj == null || this.getClass() != obj.getClass())
             return false;
-        }
-        return token.equals(((Token)obj).token);
+
+        return token.equals(((Token<T>)obj).token);
     }
 
+    @Override
     public int hashCode()
     {
         return token.hashCode();
@@ -102,4 +111,132 @@ public abstract class Token<T> implement
             throw new UnsupportedOperationException();
         }
     }
+
+    public Token<T> getToken()
+    {
+        return this;
+    }
+
+    public boolean isMinimum(IPartitioner partitioner)
+    {
+        return this.equals(partitioner.getMinimumToken());
+    }
+
+    public boolean isMinimum()
+    {
+        return isMinimum(StorageService.getPartitioner());
+    }
+
+    /*
+     * A token corresponds to the range of all the keys having this token.
+     * A token is thus no comparable directly to a key. But to be able to select
+     * keys given tokens, we introduce two "fake" keys for each token T:
+     *   - lowerBoundKey: a "fake" key representing the lower bound T represents.
+     *                    In other words, lowerBoundKey is the smallest key that
+     *                    have token T.
+     *   - upperBoundKey: a "fake" key representing the upper bound T represents.
+     *                    In other words, upperBoundKey is the largest key that
+     *                    have token T.
+     *
+     * Note that those are "fake" keys and should only be used for comparison
+     * of other keys, for selection of keys when only a token is known.
+     */
+    public KeyBound minKeyBound(IPartitioner partitioner)
+    {
+        return minimumBound;
+    }
+
+    public KeyBound minKeyBound()
+    {
+        return minKeyBound(null);
+    }
+
+    public KeyBound maxKeyBound(IPartitioner partitioner)
+    {
+        /*
+         * For each token, we needs both minKeyBound and maxKeyBound
+         * because a token corresponds to a range of keys. But the minimun
+         * token corresponds to no key, so it is valid and actually much
+         * simpler to associate the same value for minKeyBound and
+         * maxKeyBound for the minimun token.
+         */
+        if (isMinimum(partitioner))
+            return minimumBound;
+        return maximumBound;
+    }
+
+    public KeyBound maxKeyBound()
+    {
+        return maxKeyBound(StorageService.getPartitioner());
+    }
+
+    public <T extends RingPosition> T asSplitValue(Class<T> klass)
+    {
+        if (klass.equals(getClass()))
+            return (T)this;
+        else
+            return (T)maxKeyBound();
+    }
+
+    public class KeyBound extends RowPosition
+    {
+        public final boolean isMinimumBound;
+
+        private KeyBound(boolean isMinimumBound)
+        {
+            this.isMinimumBound = isMinimumBound;
+        }
+
+        public Token getToken()
+        {
+            return Token.this;
+        }
+
+        public int compareTo(RowPosition pos)
+        {
+            if (this == pos)
+                return 0;
+
+            int cmp = getToken().compareTo(pos.getToken());
+            if (cmp != 0)
+                return cmp;
+
+            // We've already eliminated the == case
+            return isMinimumBound ? -1 : 1;
+        }
+
+        public boolean isMinimum(IPartitioner partitioner)
+        {
+            return getToken().isMinimum(partitioner);
+        }
+
+        public RowPosition.Kind kind()
+        {
+            return isMinimumBound ? RowPosition.Kind.MIN_BOUND : RowPosition.Kind.MAX_BOUND;
+        }
+
+        @Override
+        public boolean equals(Object obj)
+        {
+            if (this == obj)
+                return true;
+            if (obj == null || this.getClass() != obj.getClass())
+                return false;
+
+            KeyBound other = (KeyBound)obj;
+            return getToken().equals(other.getToken());
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return getToken().hashCode() + (isMinimumBound ? 0 : 1);
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("%s(%s)", isMinimumBound ? "min" : "max", getToken().toString());
+        }
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java?rev=1208993&r1=1208992&r2=1208993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java Thu Dec  1 08:31:20 2011
@@ -38,6 +38,7 @@ import java.util.concurrent.Future;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.thrift.Cassandra;
 import org.apache.cassandra.thrift.InvalidRequestException;
 import org.apache.cassandra.thrift.KeyRange;
@@ -110,16 +111,16 @@ public class ColumnFamilyInputFormat ext
             List<Future<List<InputSplit>>> splitfutures = new ArrayList<Future<List<InputSplit>>>();
             KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
             IPartitioner partitioner = null;
-            Range jobRange = null;
+            Range<Token> jobRange = null;
             if (jobKeyRange != null)
             {
                 partitioner = ConfigHelper.getPartitioner(context.getConfiguration());
                 assert partitioner.preservesOrder() : "ConfigHelper.setInputKeyRange(..) can only be used with a order preserving paritioner";
                 assert jobKeyRange.start_key == null : "only start_token supported";
                 assert jobKeyRange.end_key == null : "only end_token supported";
-                jobRange = new Range(partitioner.getTokenFactory().fromString(jobKeyRange.start_token),
-                                     partitioner.getTokenFactory().fromString(jobKeyRange.end_token),
-                                     partitioner);
+                jobRange = new Range<Token>(partitioner.getTokenFactory().fromString(jobKeyRange.start_token),
+                                            partitioner.getTokenFactory().fromString(jobKeyRange.end_token),
+                                            partitioner);
             }
 
             for (TokenRange range : masterRangeNodes)
@@ -131,13 +132,13 @@ public class ColumnFamilyInputFormat ext
                 }
                 else
                 {
-                    Range dhtRange = new Range(partitioner.getTokenFactory().fromString(range.start_token),
-                                               partitioner.getTokenFactory().fromString(range.end_token),
-                                               partitioner);
+                    Range<Token> dhtRange = new Range<Token>(partitioner.getTokenFactory().fromString(range.start_token),
+                                                             partitioner.getTokenFactory().fromString(range.end_token),
+                                                             partitioner);
 
                     if (dhtRange.intersects(jobRange))
                     {
-                        for (Range intersection: dhtRange.intersectionWith(jobRange))
+                        for (Range<Token> intersection: dhtRange.intersectionWith(jobRange))
                         {
                             range.start_token = partitioner.getTokenFactory().toString(intersection.left);
                             range.end_token = partitioner.getTokenFactory().toString(intersection.right);

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1208993&r1=1208992&r2=1208993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Thu Dec  1 08:31:20 2011
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.client.RingCache;
 import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
@@ -119,7 +120,7 @@ implements org.apache.hadoop.mapred.Reco
     @Override
     public void write(ByteBuffer keybuff, List<Mutation> value) throws IOException
     {
-        Range range = ringCache.getRange(keybuff);
+        Range<Token> range = ringCache.getRange(keybuff);
 
         // get the client for the given range, or create a new one
         RangeClient client = clients.get(range);

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexSummary.java?rev=1208993&r1=1208992&r2=1208993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexSummary.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexSummary.java Thu Dec  1 08:31:20 2011
@@ -26,6 +26,7 @@ import java.util.List;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowPosition;
 
 /**
  * Two approaches to building an IndexSummary:
@@ -86,10 +87,12 @@ public class IndexSummary
      */
     public static final class KeyPosition implements Comparable<KeyPosition>
     {
-        public final DecoratedKey<?> key;
+        // We allow RowPosition for the purpose of being able to select keys given a token, but the index
+        // should only contain true user provided keys, i.e. DecoratedKey, which is enforced by addEntry.
+        public final RowPosition key;
         public final long indexPosition;
 
-        public KeyPosition(DecoratedKey<?> key, long indexPosition)
+        public KeyPosition(RowPosition key, long indexPosition)
         {
             this.key = key;
             this.indexPosition = indexPosition;

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java?rev=1208993&r1=1208992&r2=1208993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java Thu Dec  1 08:31:20 2011
@@ -25,6 +25,7 @@ import java.util.Iterator;
 
 import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -35,7 +36,7 @@ public class SSTableBoundedScanner exten
     private final Iterator<Pair<Long, Long>> rangeIterator;
     private Pair<Long, Long> currentRange;
 
-    SSTableBoundedScanner(SSTableReader sstable, boolean skipCache, Range range)
+    SSTableBoundedScanner(SSTableReader sstable, boolean skipCache, Range<Token> range)
     {
         super(sstable, skipCache);
         this.rangeIterator = sstable.getPositionsForRanges(Collections.singletonList(range)).iterator();

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java?rev=1208993&r1=1208992&r2=1208993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java Thu Dec  1 08:31:20 2011
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -114,12 +115,12 @@ public class SSTableLoader
             return new LoaderFuture(0);
         }
 
-        Map<InetAddress, Collection<Range>> endpointToRanges = client.getEndpointToRangesMap();
+        Map<InetAddress, Collection<Range<Token>>> endpointToRanges = client.getEndpointToRangesMap();
         outputHandler.output(String.format("Streaming revelant part of %sto %s", names(sstables), endpointToRanges.keySet()));
 
         // There will be one streaming session by endpoint
         LoaderFuture future = new LoaderFuture(endpointToRanges.size());
-        for (Map.Entry<InetAddress, Collection<Range>> entry : endpointToRanges.entrySet())
+        for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : endpointToRanges.entrySet())
         {
             InetAddress remote = entry.getKey();
             if (toIgnore.contains(remote))
@@ -127,7 +128,7 @@ public class SSTableLoader
                 future.latch.countDown();
                 continue;
             }
-            Collection<Range> ranges = entry.getValue();
+            Collection<Range<Token>> ranges = entry.getValue();
             StreamOutSession session = StreamOutSession.create(keyspace, remote, new CountDownCallback(future.latch, remote));
             // transferSSTables assumes references have been acquired
             SSTableReader.acquireReferences(sstables);
@@ -228,7 +229,7 @@ public class SSTableLoader
 
     public static abstract class Client
     {
-        private final Map<InetAddress, Collection<Range>> endpointToRanges = new HashMap<InetAddress, Collection<Range>>();
+        private final Map<InetAddress, Collection<Range<Token>>> endpointToRanges = new HashMap<InetAddress, Collection<Range<Token>>>();
         private IPartitioner partitioner;
 
         /**
@@ -253,7 +254,7 @@ public class SSTableLoader
          */
         public abstract boolean validateColumnFamily(String keyspace, String cfName);
 
-        public Map<InetAddress, Collection<Range>> getEndpointToRangesMap()
+        public Map<InetAddress, Collection<Range<Token>>> getEndpointToRangesMap()
         {
             return endpointToRanges;
         }
@@ -268,12 +269,12 @@ public class SSTableLoader
             return partitioner;
         }
 
-        protected void addRangeForEndpoint(Range range, InetAddress endpoint)
+        protected void addRangeForEndpoint(Range<Token> range, InetAddress endpoint)
         {
-            Collection<Range> ranges = endpointToRanges.get(endpoint);
+            Collection<Range<Token>> ranges = endpointToRanges.get(endpoint);
             if (ranges == null)
             {
-                ranges = new HashSet<Range>();
+                ranges = new HashSet<Range<Token>>();
                 endpointToRanges.put(endpoint, ranges);
             }
             ranges.add(range);

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1208993&r1=1208992&r2=1208993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Thu Dec  1 08:31:20 2011
@@ -44,6 +44,7 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.service.StorageService;
@@ -387,10 +388,10 @@ public class SSTableReader extends SSTab
     }
 
     /** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */
-    private IndexSummary.KeyPosition getIndexScanPosition(DecoratedKey decoratedKey)
+    private IndexSummary.KeyPosition getIndexScanPosition(RowPosition key)
     {
         assert indexSummary.getIndexPositions() != null && indexSummary.getIndexPositions().size() > 0;
-        int index = Collections.binarySearch(indexSummary.getIndexPositions(), new IndexSummary.KeyPosition(decoratedKey, -1));
+        int index = Collections.binarySearch(indexSummary.getIndexPositions(), new IndexSummary.KeyPosition(key, -1));
         if (index < 0)
         {
             // binary search gives us the first index _greater_ than the key searched for,
@@ -451,7 +452,7 @@ public class SSTableReader extends SSTab
      * @param ranges
      * @return An estimate of the number of keys for given ranges in this SSTable.
      */
-    public long estimatedKeysForRanges(Collection<Range> ranges)
+    public long estimatedKeysForRanges(Collection<Range<Token>> ranges)
     {
         long sampleKeyCount = 0;
         List<Pair<Integer, Integer>> sampleIndexes = getSampleIndexesForRanges(indexSummary.getIndexPositions(), ranges);
@@ -469,24 +470,26 @@ public class SSTableReader extends SSTab
                                       new Function<IndexSummary.KeyPosition, DecoratedKey>(){
                                           public DecoratedKey apply(IndexSummary.KeyPosition kp)
                                           {
-                                              return kp.key;
+                                              // the index should only contain valid row key, we only allow RowPosition in KeyPosition for search purposes
+                                              assert kp.key instanceof DecoratedKey;
+                                              return (DecoratedKey)kp.key;
                                           }
                                       });
     }
 
-    private static List<Pair<Integer,Integer>> getSampleIndexesForRanges(List<IndexSummary.KeyPosition> samples, Collection<Range> ranges)
+    private static List<Pair<Integer,Integer>> getSampleIndexesForRanges(List<IndexSummary.KeyPosition> samples, Collection<Range<Token>> ranges)
     {
         // use the index to determine a minimal section for each range
         List<Pair<Integer,Integer>> positions = new ArrayList<Pair<Integer,Integer>>();
         if (samples.isEmpty())
             return positions;
 
-        for (AbstractBounds range : AbstractBounds.normalize(ranges))
+        for (AbstractBounds<Token> range : AbstractBounds.<Token>normalize(ranges))
         {
-            DecoratedKey leftKey = new DecoratedKey(range.left, null);
-            DecoratedKey rightKey = new DecoratedKey(range.right, null);
+            RowPosition leftPosition = range.left.maxKeyBound();
+            RowPosition rightPosition = range.left.maxKeyBound();
 
-            int left = Collections.binarySearch(samples, new IndexSummary.KeyPosition(leftKey, -1));
+            int left = Collections.binarySearch(samples, new IndexSummary.KeyPosition(leftPosition, -1));
             if (left < 0)
                 left = (left + 1) * -1;
             else
@@ -498,7 +501,7 @@ public class SSTableReader extends SSTab
 
             int right = Range.isWrapAround(range.left, range.right)
                       ? samples.size() - 1
-                      : Collections.binarySearch(samples, new IndexSummary.KeyPosition(rightKey, -1));
+                      : Collections.binarySearch(samples, new IndexSummary.KeyPosition(rightPosition, -1));
             if (right < 0)
             {
                 // range are end inclusive so we use the previous index from what binarySearch give us
@@ -518,7 +521,7 @@ public class SSTableReader extends SSTab
         return positions;
     }
 
-    public Iterable<DecoratedKey> getKeySamples(final Range range)
+    public Iterable<DecoratedKey> getKeySamples(final Range<Token> range)
     {
         final List<IndexSummary.KeyPosition> samples = indexSummary.getIndexPositions();
 
@@ -555,7 +558,10 @@ public class SSTableReader extends SSTab
 
                     public DecoratedKey next()
                     {
-                        return samples.get(idx++).key;
+                        RowPosition k = samples.get(idx++).key;
+                        // the index should only contain valid row key, we only allow RowPosition in KeyPosition for search purposes
+                        assert k instanceof DecoratedKey;
+                        return (DecoratedKey)k;
                     }
 
                     public void remove()
@@ -571,17 +577,18 @@ public class SSTableReader extends SSTab
      * Determine the minimal set of sections that can be extracted from this SSTable to cover the given ranges.
      * @return A sorted list of (offset,end) pairs that cover the given ranges in the datafile for this SSTable.
      */
-    public List<Pair<Long,Long>> getPositionsForRanges(Collection<Range> ranges)
+    public List<Pair<Long,Long>> getPositionsForRanges(Collection<Range<Token>> ranges)
     {
         // use the index to determine a minimal section for each range
         List<Pair<Long,Long>> positions = new ArrayList<Pair<Long,Long>>();
-        for (AbstractBounds range : AbstractBounds.normalize(ranges))
+        for (AbstractBounds<Token> range : AbstractBounds.normalize(ranges))
         {
-            long left = getPosition(new DecoratedKey(range.left, null), Operator.GT);
+            AbstractBounds<RowPosition> keyRange = range.toRowBounds();
+            long left = getPosition(keyRange.left, Operator.GT);
             if (left == -1)
                 // left is past the end of the file
                 continue;
-            long right = getPosition(new DecoratedKey(range.right, null), Operator.GT);
+            long right = getPosition(keyRange.right, Operator.GT);
             if (right == -1 || Range.isWrapAround(range.left, range.right))
                 // right is past the end of the file, or it wraps
                 right = uncompressedLength();
@@ -595,7 +602,6 @@ public class SSTableReader extends SSTab
 
     public void cacheKey(DecoratedKey key, Long info)
     {
-        assert key.key != null;
         // avoid keeping a permanent reference to the original key buffer
         DecoratedKey copiedKey = new DecoratedKey(key.token, ByteBufferUtil.clone(key.key));
         keyCache.put(new Pair<Descriptor, DecoratedKey>(descriptor, copiedKey), info);
@@ -614,23 +620,25 @@ public class SSTableReader extends SSTab
     }
 
     /**
-     * @param decoratedKey The key to apply as the rhs to the given Operator.
+     * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
+     * allow key selection by token bounds but only if op != * EQ
      * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
      * @return The position in the data file to find the key, or -1 if the key is not present
      */
-    public long getPosition(DecoratedKey decoratedKey, Operator op)
+    public long getPosition(RowPosition key, Operator op)
     {
         // first, check bloom filter
         if (op == Operator.EQ)
         {
-            assert decoratedKey.key != null; // null is ok for GE scans
-            if (!bf.isPresent(decoratedKey.key))
+            assert key instanceof DecoratedKey; // EQ only make sense if the key is a valid row key
+            if (!bf.isPresent(((DecoratedKey)key).key))
                 return -1;
         }
 
-        // next, the key cache
-        if (op == Operator.EQ || op == Operator.GE)
+        // next, the key cache (only make sense for valid row key)
+        if ((op == Operator.EQ || op == Operator.GE) && (key instanceof DecoratedKey))
         {
+            DecoratedKey decoratedKey = (DecoratedKey)key;
             Pair<Descriptor, DecoratedKey> unifiedKey = new Pair<Descriptor, DecoratedKey>(descriptor, decoratedKey);
             Long cachedPosition = getCachedPosition(unifiedKey, true);
             if (cachedPosition != null)
@@ -638,7 +646,7 @@ public class SSTableReader extends SSTab
         }
 
         // next, see if the sampled index says it's impossible for the key to be present
-        IndexSummary.KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
+        IndexSummary.KeyPosition sampledPosition = getIndexScanPosition(key);
         if (sampledPosition == null)
         {
             if (op == Operator.EQ)
@@ -660,15 +668,16 @@ public class SSTableReader extends SSTab
                     DecoratedKey indexDecoratedKey = decodeKey(partitioner, descriptor, ByteBufferUtil.readWithShortLength(input));
                     long dataPosition = input.readLong();
 
-                    int comparison = indexDecoratedKey.compareTo(decoratedKey);
+                    int comparison = indexDecoratedKey.compareTo(key);
                     int v = op.apply(comparison);
                     if (v == 0)
                     {
                         if (comparison == 0 && keyCache != null && keyCache.getCapacity() > 0)
                         {
+                            assert key instanceof DecoratedKey; // key can be == to the index key only if it's a true row key
+                            DecoratedKey decoratedKey = (DecoratedKey)key;
                             // store exact match for the key
-                            if (decoratedKey.key != null)
-                                cacheKey(decoratedKey, dataPosition);
+                            cacheKey(decoratedKey, dataPosition);
                         }
                         if (op == Operator.EQ)
                             bloomFilterTracker.addTruePositive();
@@ -795,7 +804,7 @@ public class SSTableReader extends SSTab
     * @param range the range of keys to cover
     * @return A Scanner for seeking over the rows of the SSTable.
     */
-    public SSTableScanner getDirectScanner(Range range)
+    public SSTableScanner getDirectScanner(Range<Token> range)
     {
         return new SSTableBoundedScanner(this, true, range);
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java?rev=1208993&r1=1208992&r2=1208993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java Thu Dec  1 08:31:20 2011
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.io.util.RandomAccessReader;
@@ -84,7 +85,7 @@ public class SSTableScanner implements C
         file.close();
     }
 
-    public void seekTo(DecoratedKey<?> seekKey)
+    public void seekTo(RowPosition seekKey)
     {
         try
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=1208993&r1=1208992&r2=1208993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Thu Dec  1 08:31:20 2011
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.RingPosition;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.service.DatacenterSyncWriteResponseHandler;
 import org.apache.cassandra.service.DatacenterWriteResponseHandler;
@@ -83,14 +84,15 @@ public abstract class AbstractReplicatio
     }
 
     /**
-     * get the (possibly cached) endpoints that should store the given Token
+     * get the (possibly cached) endpoints that should store the given Token.
      * Note that while the endpoints are conceptually a Set (no duplicates will be included),
      * we return a List to avoid an extra allocation when sorting by proximity later
      * @param searchToken the token the natural endpoints are requested for
      * @return a copy of the natural endpoints for the given token
      */
-    public ArrayList<InetAddress> getNaturalEndpoints(Token searchToken)
+    public ArrayList<InetAddress> getNaturalEndpoints(RingPosition searchPosition)
     {
+        Token searchToken = searchPosition.getToken();
         Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
         ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken);
         if (endpoints == null)
@@ -142,13 +144,13 @@ public abstract class AbstractReplicatio
      * (fixing this would probably require merging tokenmetadata into replicationstrategy,
      * so we could cache/invalidate cleanly.)
      */
-    public Multimap<InetAddress, Range> getAddressRanges(TokenMetadata metadata)
+    public Multimap<InetAddress, Range<Token>> getAddressRanges(TokenMetadata metadata)
     {
-        Multimap<InetAddress, Range> map = HashMultimap.create();
+        Multimap<InetAddress, Range<Token>> map = HashMultimap.create();
 
         for (Token token : metadata.sortedTokens())
         {
-            Range range = metadata.getPrimaryRangeFor(token);
+            Range<Token> range = metadata.getPrimaryRangeFor(token);
             for (InetAddress ep : calculateNaturalEndpoints(token, metadata))
             {
                 map.put(ep, range);
@@ -158,13 +160,13 @@ public abstract class AbstractReplicatio
         return map;
     }
 
-    public Multimap<Range, InetAddress> getRangeAddresses(TokenMetadata metadata)
+    public Multimap<Range<Token>, InetAddress> getRangeAddresses(TokenMetadata metadata)
     {
-        Multimap<Range, InetAddress> map = HashMultimap.create();
+        Multimap<Range<Token>, InetAddress> map = HashMultimap.create();
 
         for (Token token : metadata.sortedTokens())
         {
-            Range range = metadata.getPrimaryRangeFor(token);
+            Range<Token> range = metadata.getPrimaryRangeFor(token);
             for (InetAddress ep : calculateNaturalEndpoints(token, metadata))
             {
                 map.put(range, ep);
@@ -174,12 +176,12 @@ public abstract class AbstractReplicatio
         return map;
     }
 
-    public Multimap<InetAddress, Range> getAddressRanges()
+    public Multimap<InetAddress, Range<Token>> getAddressRanges()
     {
         return getAddressRanges(tokenMetadata);
     }
 
-    public Collection<Range> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddress pendingAddress)
+    public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddress pendingAddress)
     {
         TokenMetadata temp = metadata.cloneOnlyTokenMap();
         temp.updateNormalToken(pendingToken, pendingAddress);

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java?rev=1208993&r1=1208992&r2=1208993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/TokenMetadata.java Thu Dec  1 08:31:20 2011
@@ -71,7 +71,7 @@ public class TokenMetadata
     // (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving)
     private Set<InetAddress> leavingEndpoints = new HashSet<InetAddress>();
     // this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints}
-    private ConcurrentMap<String, Multimap<Range, InetAddress>> pendingRanges = new ConcurrentHashMap<String, Multimap<Range, InetAddress>>();
+    private ConcurrentMap<String, Multimap<Range<Token>, InetAddress>> pendingRanges = new ConcurrentHashMap<String, Multimap<Range<Token>, InetAddress>>();
 
     // nodes which are migrating to the new tokens in the ring
     private Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<Pair<Token, InetAddress>>();
@@ -108,7 +108,7 @@ public class TokenMetadata
     public int pendingRangeChanges(InetAddress source)
     {
         int n = 0;
-        Range sourceRange = getPrimaryRangeFor(getToken(source));
+        Range<Token> sourceRange = getPrimaryRangeFor(getToken(source));
         for (Token token : bootstrapTokens.keySet())
             if (sourceRange.contains(token))
                 n++;
@@ -423,9 +423,9 @@ public class TokenMetadata
         }
     }
 
-    public Range getPrimaryRangeFor(Token right)
+    public Range<Token> getPrimaryRangeFor(Token right)
     {
-        return new Range(getPredecessor(right), right);
+        return new Range<Token>(getPredecessor(right), right);
     }
 
     public ArrayList<Token> sortedTokens()
@@ -441,13 +441,13 @@ public class TokenMetadata
         }
     }
 
-    private Multimap<Range, InetAddress> getPendingRangesMM(String table)
+    private Multimap<Range<Token>, InetAddress> getPendingRangesMM(String table)
     {
-        Multimap<Range, InetAddress> map = pendingRanges.get(table);
+        Multimap<Range<Token>, InetAddress> map = pendingRanges.get(table);
         if (map == null)
         {
             map = HashMultimap.create();
-            Multimap<Range, InetAddress> priorMap = pendingRanges.putIfAbsent(table, map);
+            Multimap<Range<Token>, InetAddress> priorMap = pendingRanges.putIfAbsent(table, map);
             if (priorMap != null)
                 map = priorMap;
         }
@@ -455,15 +455,15 @@ public class TokenMetadata
     }
 
     /** a mutable map may be returned but caller should not modify it */
-    public Map<Range, Collection<InetAddress>> getPendingRanges(String table)
+    public Map<Range<Token>, Collection<InetAddress>> getPendingRanges(String table)
     {
         return getPendingRangesMM(table).asMap();
     }
 
-    public List<Range> getPendingRanges(String table, InetAddress endpoint)
+    public List<Range<Token>> getPendingRanges(String table, InetAddress endpoint)
     {
-        List<Range> ranges = new ArrayList<Range>();
-        for (Map.Entry<Range, InetAddress> entry : getPendingRangesMM(table).entries())
+        List<Range<Token>> ranges = new ArrayList<Range<Token>>();
+        for (Map.Entry<Range<Token>, InetAddress> entry : getPendingRangesMM(table).entries())
         {
             if (entry.getValue().equals(endpoint))
             {
@@ -473,7 +473,7 @@ public class TokenMetadata
         return ranges;
     }
 
-    public void setPendingRanges(String table, Multimap<Range, InetAddress> rangeMap)
+    public void setPendingRanges(String table, Multimap<Range<Token>, InetAddress> rangeMap)
     {
         pendingRanges.put(table, rangeMap);
     }
@@ -545,7 +545,7 @@ public class TokenMetadata
             return includeMin ? Iterators.singletonIterator(StorageService.getPartitioner().getMinimumToken())
                               : Iterators.<Token>emptyIterator();
 
-        final boolean insertMin = (includeMin && !ring.get(0).equals(StorageService.getPartitioner().getMinimumToken())) ? true : false;
+        final boolean insertMin = (includeMin && !ring.get(0).isMinimum()) ? true : false;
         final int startIndex = firstTokenIndex(ring, start, insertMin);
         return new AbstractIterator<Token>()
         {
@@ -647,9 +647,9 @@ public class TokenMetadata
     {
         StringBuilder sb = new StringBuilder();
 
-        for (Map.Entry<String, Multimap<Range, InetAddress>> entry : pendingRanges.entrySet())
+        for (Map.Entry<String, Multimap<Range<Token>, InetAddress>> entry : pendingRanges.entrySet())
         {
-            for (Map.Entry<Range, InetAddress> rmap : entry.getValue().entries())
+            for (Map.Entry<Range<Token>, InetAddress> rmap : entry.getValue().entries())
             {
                 sb.append(rmap.getValue() + ":" + rmap.getKey());
                 sb.append(System.getProperty("line.separator"));
@@ -689,13 +689,13 @@ public class TokenMetadata
      */
     public Collection<InetAddress> getWriteEndpoints(Token token, String table, Collection<InetAddress> naturalEndpoints)
     {
-        Map<Range, Collection<InetAddress>> ranges = getPendingRanges(table);
+        Map<Range<Token>, Collection<InetAddress>> ranges = getPendingRanges(table);
         if (ranges.isEmpty())
             return naturalEndpoints;
 
         Set<InetAddress> endpoints = new HashSet<InetAddress>(naturalEndpoints);
 
-        for (Map.Entry<Range, Collection<InetAddress>> entry : ranges.entrySet())
+        for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : ranges.entrySet())
         {
             if (entry.getKey().contains(token))
             {

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1208993&r1=1208992&r2=1208993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Thu Dec  1 08:31:20 2011
@@ -67,7 +67,8 @@ public final class MessagingService impl
     public static final int VERSION_07 = 1;
     public static final int VERSION_080 = 2;
     public static final int VERSION_10 = 3;
-    public static final int version_ = VERSION_10;
+    public static final int VERSION_11 = 4;
+    public static final int version_ = VERSION_11;
 
     static SerializerType serializerType_ = SerializerType.BINARY;
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1208993&r1=1208992&r2=1208993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Thu Dec  1 08:31:20 2011
@@ -41,6 +41,7 @@ import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
@@ -118,7 +119,7 @@ public class AntiEntropyService
     /**
      * Requests repairs for the given table and column families, and blocks until all repairs have been completed.
      */
-    public RepairFuture submitRepairSession(Range range, String tablename, String... cfnames)
+    public RepairFuture submitRepairSession(Range<Token> range, String tablename, String... cfnames)
     {
         RepairFuture futureTask = new RepairSession(range, tablename, cfnames).getFuture();
         executor.execute(futureTask);
@@ -145,10 +146,10 @@ public class AntiEntropyService
     /**
      * Return all of the neighbors with whom we share data.
      */
-    static Set<InetAddress> getNeighbors(String table, Range range)
+    static Set<InetAddress> getNeighbors(String table, Range<Token> range)
     {
         StorageService ss = StorageService.instance;
-        Map<Range, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(table);
+        Map<Range<Token>, List<InetAddress>> replicaSets = ss.getRangeToAddressMap(table);
         if (!replicaSets.containsKey(range))
             return Collections.emptySet();
         Set<InetAddress> neighbors = new HashSet<InetAddress>(replicaSets.get(range));
@@ -209,7 +210,7 @@ public class AntiEntropyService
     /**
      * Requests a tree from the given node, and returns the request that was sent.
      */
-    TreeRequest request(String sessionid, InetAddress remote, Range range, String ksname, String cfname)
+    TreeRequest request(String sessionid, InetAddress remote, Range<Token> range, String ksname, String cfname)
     {
         TreeRequest request = new TreeRequest(sessionid, remote, range, new CFPair(ksname, cfname));
         MessagingService.instance().sendOneWay(TreeRequestVerbHandler.makeVerb(request, Gossiper.instance.getVersion(remote)), remote);
@@ -429,7 +430,7 @@ public class AntiEntropyService
             dos.writeUTF(request.cf.left);
             dos.writeUTF(request.cf.right);
             if (version > MessagingService.VERSION_07)
-                AbstractBounds.serializer().serialize(request.range, dos);
+                AbstractBounds.serializer().serialize(request.range, dos, version);
         }
 
         public TreeRequest deserialize(DataInput dis, int version) throws IOException
@@ -437,11 +438,11 @@ public class AntiEntropyService
             String sessId = dis.readUTF();
             InetAddress endpoint = CompactEndpointSerializationHelper.deserialize(dis);
             CFPair cfpair = new CFPair(dis.readUTF(), dis.readUTF());
-            Range range;
+            Range<Token> range;
             if (version > MessagingService.VERSION_07)
-                range = (Range) AbstractBounds.serializer().deserialize(dis);
+                range = (Range<Token>) AbstractBounds.serializer().deserialize(dis, version);
             else
-                range = new Range(StorageService.getPartitioner().getMinimumToken(), StorageService.getPartitioner().getMinimumToken());
+                range = new Range<Token>(StorageService.getPartitioner().getMinimumToken(), StorageService.getPartitioner().getMinimumToken());
 
             return new TreeRequest(sessId, endpoint, range, cfpair);
         }
@@ -555,10 +556,10 @@ public class AntiEntropyService
     {
         public final String sessionid;
         public final InetAddress endpoint;
-        public final Range range;
+        public final Range<Token> range;
         public final CFPair cf;
 
-        public TreeRequest(String sessionid, InetAddress endpoint, Range range, CFPair cf)
+        public TreeRequest(String sessionid, InetAddress endpoint, Range<Token> range, CFPair cf)
         {
             this.sessionid = sessionid;
             this.endpoint = endpoint;
@@ -598,7 +599,7 @@ public class AntiEntropyService
         private final String sessionName;
         private final String tablename;
         private final String[] cfnames;
-        private final Range range;
+        private final Range<Token> range;
         private volatile Exception exception;
         private final AtomicBoolean isFailed = new AtomicBoolean(false);
 
@@ -617,12 +618,12 @@ public class AntiEntropyService
             AntiEntropyService.instance.sessions.put(getName(), this);
         }
 
-        public RepairSession(Range range, String tablename, String... cfnames)
+        public RepairSession(Range<Token> range, String tablename, String... cfnames)
         {
             this(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()).toString(), range, tablename, cfnames);
         }
 
-        private RepairSession(String id, Range range, String tablename, String[] cfnames)
+        private RepairSession(String id, Range<Token> range, String tablename, String[] cfnames)
         {
             this.sessionName = id;
             this.tablename = tablename;
@@ -895,14 +896,14 @@ public class AntiEntropyService
             public final String cfname;
             public final TreeResponse r1;
             public final TreeResponse r2;
-            public List<Range> differences;
+            public List<Range<Token>> differences;
 
             Differencer(String cfname, TreeResponse r1, TreeResponse r2)
             {
                 this.cfname = cfname;
                 this.r1 = r1;
                 this.r2 = r2;
-                this.differences = new ArrayList<Range>();
+                this.differences = new ArrayList<Range<Token>>();
             }
 
             /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1208993&r1=1208992&r2=1208993&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu Dec  1 08:31:20 2011
@@ -50,6 +50,7 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.RingPosition;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
@@ -259,8 +260,9 @@ public class StorageProxy implements Sto
     private static Collection<InetAddress> getWriteEndpoints(String table, ByteBuffer key)
     {
         StorageService ss = StorageService.instance;
-        List<InetAddress> naturalEndpoints = ss.getNaturalEndpoints(table, key);
-        return ss.getTokenMetadata().getWriteEndpoints(StorageService.getPartitioner().getToken(key), table, naturalEndpoints);
+        Token tk = StorageService.getPartitioner().getToken(key);
+        List<InetAddress> naturalEndpoints = ss.getNaturalEndpoints(table, tk);
+        return ss.getTokenMetadata().getWriteEndpoints(tk, table, naturalEndpoints);
     }
 
     /**
@@ -817,8 +819,8 @@ public class StorageProxy implements Sto
         try
         {
             rows = new ArrayList<Row>(command.max_keys);
-            List<AbstractBounds> ranges = getRestrictedRanges(command.range);
-            for (AbstractBounds range : ranges)
+            List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(command.range);
+            for (AbstractBounds<RowPosition> range : ranges)
             {
                 List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(command.keyspace, range.right);
                 DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints);
@@ -979,10 +981,10 @@ public class StorageProxy implements Sto
      * Compute all ranges we're going to query, in sorted order. Nodes can be replica destinations for many ranges,
      * so we need to restrict each scan to the specific range we want, or else we'd get duplicate results.
      */
-    static List<AbstractBounds> getRestrictedRanges(final AbstractBounds queryRange)
+    static <T extends RingPosition> List<AbstractBounds<T>> getRestrictedRanges(final AbstractBounds<T> queryRange)
     {
         // special case for bounds containing exactly 1 (non-minimum) token
-        if (queryRange instanceof Bounds && queryRange.left.equals(queryRange.right) && !queryRange.left.equals(StorageService.getPartitioner().getMinimumToken()))
+        if (queryRange instanceof Bounds && queryRange.left.equals(queryRange.right) && !queryRange.left.isMinimum(StorageService.getPartitioner()))
         {
             if (logger.isDebugEnabled())
                 logger.debug("restricted single token match for query {}", queryRange);
@@ -991,17 +993,28 @@ public class StorageProxy implements Sto
 
         TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata();
 
-        List<AbstractBounds> ranges = new ArrayList<AbstractBounds>();
+        List<AbstractBounds<T>> ranges = new ArrayList<AbstractBounds<T>>();
         // divide the queryRange into pieces delimited by the ring and minimum tokens
-        Iterator<Token> ringIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), queryRange.left, true);
-        AbstractBounds remainder = queryRange;
+        Iterator<Token> ringIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), queryRange.left.getToken(), true);
+        AbstractBounds<T> remainder = queryRange;
         while (ringIter.hasNext())
         {
             Token token = ringIter.next();
-            if (remainder == null || !(remainder.left.equals(token) || remainder.contains(token)))
+            /*
+             * remainder can be a range/bounds of token _or_ keys and we want to split it with a token:
+             *   - if remainder is tokens, then we'll just split using the provided token.
+             *   - if reaminer is keys, we want to split using token.upperBoundKey. For instance, if remainder
+             *     is [DK(10, 'foo'), DK(20, 'bar')], and we have 3 nodes with tokens 0, 15, 30. We want to
+             *     split remainder to A=[DK(10, 'foo'), 15] and B=(15, DK(20, 'bar')]. But since we can't mix
+             *     tokens and keys at the same time in a range, we uses 15.upperBoundKey() to have A include all
+             *     keys having 15 as token and B include none of those (since that is what our node owns).
+             * asSplitValue() abstracts that choice.
+             */
+            T splitValue = (T)token.asSplitValue(queryRange.left.getClass());
+            if (remainder == null || !(remainder.left.equals(splitValue) || remainder.contains(splitValue)))
                 // no more splits
                 break;
-            Pair<AbstractBounds,AbstractBounds> splits = remainder.split(token);
+            Pair<AbstractBounds<T>,AbstractBounds<T>> splits = remainder.split(splitValue);
             if (splits.left != null)
                 ranges.add(splits.left);
             remainder = splits.right;
@@ -1094,13 +1107,13 @@ public class StorageProxy implements Sto
     {
         IPartitioner p = StorageService.getPartitioner();
 
-        Token leftToken = index_clause.start_key == null ? p.getMinimumToken() : p.getToken(index_clause.start_key);
-        List<AbstractBounds> ranges = getRestrictedRanges(new Bounds(leftToken, p.getMinimumToken()));
+        RowPosition leftPos = RowPosition.forKey(index_clause.start_key, p);
+        List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(new Bounds<RowPosition>(leftPos, p.getMinimumToken().minKeyBound()));
         logger.debug("scan ranges are {}", StringUtils.join(ranges, ","));
 
         // now scan until we have enough results
         List<Row> rows = new ArrayList<Row>(index_clause.count);
-        for (AbstractBounds range : ranges)
+        for (AbstractBounds<RowPosition> range : ranges)
         {
             List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(keyspace, range.right);
             DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints);



Mime
View raw message