cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [07/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099
Date Tue, 30 Jun 2015 10:47:31 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/tools/SSTableImport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableImport.java b/src/java/org/apache/cassandra/tools/SSTableImport.java
deleted file mode 100644
index b2d63aa..0000000
--- a/src/java/org/apache/cassandra/tools/SSTableImport.java
+++ /dev/null
@@ -1,568 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.tools;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.*;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
-import org.codehaus.jackson.map.MappingJsonFactory;
-import org.codehaus.jackson.type.TypeReference;
-
-/**
- * Create SSTables from JSON input
- */
-public class SSTableImport
-{
-    private static final String KEYSPACE_OPTION = "K";
-    private static final String COLUMN_FAMILY_OPTION = "c";
-    private static final String KEY_COUNT_OPTION = "n";
-    private static final String IS_SORTED_OPTION = "s";
-
-    private static final Options options = new Options();
-    private static CommandLine cmd;
-
-    private Integer keyCountToImport;
-    private final boolean isSorted;
-
-    private static final JsonFactory factory = new MappingJsonFactory().configure(
-                                                                                 JsonParser.Feature.INTERN_FIELD_NAMES, false);
-
-    static
-    {
-        Option optKeyspace = new Option(KEYSPACE_OPTION, true, "Keyspace name.");
-        optKeyspace.setRequired(true);
-        options.addOption(optKeyspace);
-
-        Option optColfamily = new Option(COLUMN_FAMILY_OPTION, true, "Table name.");
-        optColfamily.setRequired(true);
-        options.addOption(optColfamily);
-
-        options.addOption(new Option(KEY_COUNT_OPTION, true, "Number of keys to import (Optional)."));
-        options.addOption(new Option(IS_SORTED_OPTION, false, "Assume JSON file as already sorted (e.g. created by sstable2json tool) (Optional)."));
-    }
-
-    private static class JsonColumn<T>
-    {
-        private ByteBuffer name;
-        private ByteBuffer value;
-        private long timestamp;
-
-        private String kind;
-        // Expiring columns
-        private int ttl;
-        private int localExpirationTime;
-
-        // Counter columns
-        private long timestampOfLastDelete;
-
-        public JsonColumn(T json, CFMetaData meta)
-        {
-            if (json instanceof List)
-            {
-                CellNameType comparator = meta.comparator;
-                List fields = (List<?>) json;
-
-                assert fields.size() >= 3 : "Cell definition should have at least 3";
-
-                name  = stringAsType((String) fields.get(0), comparator.asAbstractType());
-                timestamp = (Long) fields.get(2);
-                kind = "";
-
-                if (fields.size() > 3)
-                {
-                    kind = (String) fields.get(3);
-                    if (isExpiring())
-                    {
-                        ttl = (Integer) fields.get(4);
-                        localExpirationTime = (Integer) fields.get(5);
-                    }
-                    else if (isCounter())
-                    {
-                        timestampOfLastDelete = ((Integer) fields.get(4));
-                    }
-                    else if (isRangeTombstone())
-                    {
-                        localExpirationTime = (Integer) fields.get(4);
-                    }
-                }
-
-                if (isDeleted())
-                {
-                    value = ByteBufferUtil.bytes((Integer) fields.get(1));
-                }
-                else if (isRangeTombstone())
-                {
-                    value = stringAsType((String) fields.get(1), comparator.asAbstractType());
-                }
-                else
-                {
-                    assert meta.isCQL3Table() || name.hasRemaining() : "Cell name should not be empty";
-                    value = stringAsType((String) fields.get(1),
-                                         meta.getValueValidator(name.hasRemaining()
-                                                                ? comparator.cellFromByteBuffer(name)
-                                                                : meta.comparator.rowMarker(Composites.EMPTY)));
-                }
-            }
-        }
-
-        public boolean isDeleted()
-        {
-            return kind.equals("d");
-        }
-
-        public boolean isExpiring()
-        {
-            return kind.equals("e");
-        }
-
-        public boolean isCounter()
-        {
-            return kind.equals("c");
-        }
-
-        public boolean isRangeTombstone()
-        {
-            return kind.equals("t");
-        }
-
-        public ByteBuffer getName()
-        {
-            return name.duplicate();
-        }
-
-        public ByteBuffer getValue()
-        {
-            return value.duplicate();
-        }
-    }
-
-    public SSTableImport()
-    {
-        this(null, false);
-    }
-
-    public SSTableImport(boolean isSorted)
-    {
-        this(null, isSorted);
-    }
-
-    public SSTableImport(Integer keyCountToImport, boolean isSorted)
-    {
-        this.keyCountToImport = keyCountToImport;
-        this.isSorted = isSorted;
-    }
-
-    /**
-     * Add columns to a column family.
-     *
-     * @param row the columns associated with a row
-     * @param cfamily the column family to add columns to
-     */
-    private void addColumnsToCF(List<?> row, ColumnFamily cfamily)
-    {
-        CFMetaData cfm = cfamily.metadata();
-        assert cfm != null;
-
-        for (Object c : row)
-        {
-            JsonColumn col = new JsonColumn<List>((List) c, cfm);
-            if (col.isRangeTombstone())
-            {
-                Composite start = cfm.comparator.fromByteBuffer(col.getName());
-                Composite end = cfm.comparator.fromByteBuffer(col.getValue());
-                cfamily.addAtom(new RangeTombstone(start, end, col.timestamp, col.localExpirationTime));
-                continue;
-            }
-
-            assert cfm.isCQL3Table() || col.getName().hasRemaining() : "Cell name should not be empty";
-            CellName cname = col.getName().hasRemaining() ? cfm.comparator.cellFromByteBuffer(col.getName())
-                                                          : cfm.comparator.rowMarker(Composites.EMPTY);
-
-            if (col.isExpiring())
-            {
-                cfamily.addColumn(new BufferExpiringCell(cname, col.getValue(), col.timestamp, col.ttl, col.localExpirationTime));
-            }
-            else if (col.isCounter())
-            {
-                cfamily.addColumn(new BufferCounterCell(cname, col.getValue(), col.timestamp, col.timestampOfLastDelete));
-            }
-            else if (col.isDeleted())
-            {
-                cfamily.addTombstone(cname, col.getValue(), col.timestamp);
-            }
-            else if (col.isRangeTombstone())
-            {
-                CellName end = cfm.comparator.cellFromByteBuffer(col.getValue());
-                cfamily.addAtom(new RangeTombstone(cname, end, col.timestamp, col.localExpirationTime));
-            }
-            // cql3 row marker, see CASSANDRA-5852
-            else if (cname.isEmpty())
-            {
-                cfamily.addColumn(cfm.comparator.rowMarker(Composites.EMPTY), col.getValue(), col.timestamp);
-            }
-            else
-            {
-                cfamily.addColumn(cname, col.getValue(), col.timestamp);
-            }
-        }
-    }
-
-    private void parseMeta(Map<?, ?> map, ColumnFamily cf, ByteBuffer superColumnName)
-    {
-
-        // deletionInfo is the only metadata we store for now
-        if (map.containsKey("deletionInfo"))
-        {
-            Map<?, ?> unparsedDeletionInfo = (Map<?, ?>) map.get("deletionInfo");
-            Number number = (Number) unparsedDeletionInfo.get("markedForDeleteAt");
-            long markedForDeleteAt = number instanceof Long ? (Long) number : number.longValue();
-            int localDeletionTime = (Integer) unparsedDeletionInfo.get("localDeletionTime");
-            if (superColumnName == null)
-                cf.setDeletionInfo(new DeletionInfo(markedForDeleteAt, localDeletionTime));
-            else
-                cf.addAtom(new RangeTombstone(SuperColumns.startOf(superColumnName), SuperColumns.endOf(superColumnName), markedForDeleteAt, localDeletionTime));
-        }
-    }
-
-    /**
-     * Convert a JSON formatted file to an SSTable.
-     *
-     * @param jsonFile the file containing JSON formatted data
-     * @param keyspace keyspace the data belongs to
-     * @param cf column family the data belongs to
-     * @param ssTablePath file to write the SSTable to
-     *
-     * @throws IOException for errors reading/writing input/output
-     */
-    public int importJson(String jsonFile, String keyspace, String cf, String ssTablePath) throws IOException
-    {
-        ColumnFamily columnFamily = ArrayBackedSortedColumns.factory.create(keyspace, cf);
-        IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
-
-        int importedKeys = (isSorted) ? importSorted(jsonFile, columnFamily, ssTablePath, partitioner)
-                                      : importUnsorted(jsonFile, columnFamily, ssTablePath, partitioner);
-
-        if (importedKeys != -1)
-            System.out.printf("%d keys imported successfully.%n", importedKeys);
-
-        return importedKeys;
-    }
-
-    private int importUnsorted(String jsonFile, ColumnFamily columnFamily, String ssTablePath, IPartitioner partitioner) throws IOException
-    {
-        int importedKeys = 0;
-        long start = System.nanoTime();
-
-        Object[] data;
-        try (JsonParser parser = getParser(jsonFile))
-        {
-            data = parser.readValueAs(new TypeReference<Object[]>(){});
-        }
-
-        keyCountToImport = (keyCountToImport == null) ? data.length : keyCountToImport;
-
-        try (SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(ssTablePath), keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE, 0))
-        {
-            System.out.printf("Importing %s keys...%n", keyCountToImport);
-
-            // sort by dk representation, but hold onto the hex version
-            SortedMap<DecoratedKey, Map<?, ?>> decoratedKeys = new TreeMap<DecoratedKey, Map<?, ?>>();
-
-            for (Object row : data)
-            {
-                Map<?, ?> rowAsMap = (Map<?, ?>) row;
-                decoratedKeys.put(partitioner.decorateKey(getKeyValidator(columnFamily).fromString((String) rowAsMap.get("key"))), rowAsMap);
-            }
-
-            for (Map.Entry<DecoratedKey, Map<?, ?>> row : decoratedKeys.entrySet())
-            {
-                if (row.getValue().containsKey("metadata"))
-                {
-                    parseMeta((Map<?, ?>) row.getValue().get("metadata"), columnFamily, null);
-                }
-
-                Object columns = row.getValue().get("cells");
-                addColumnsToCF((List<?>) columns, columnFamily);
-
-
-                writer.append(row.getKey(), columnFamily);
-                columnFamily.clear();
-
-                importedKeys++;
-
-                long current = System.nanoTime();
-
-                if (TimeUnit.NANOSECONDS.toSeconds(current - start) >= 5) // 5 secs.
-                {
-                    System.out.printf("Currently imported %d keys.%n", importedKeys);
-                    start = current;
-                }
-
-                if (keyCountToImport == importedKeys)
-                    break;
-            }
-
-            writer.finish(true);
-        }
-
-        return importedKeys;
-    }
-
-    private int importSorted(String jsonFile, ColumnFamily columnFamily, String ssTablePath,
-                             IPartitioner partitioner) throws IOException
-    {
-        int importedKeys = 0; // already imported keys count
-        long start = System.nanoTime();
-
-        try (JsonParser parser = getParser(jsonFile))
-        {
-
-            if (keyCountToImport == null)
-            {
-                keyCountToImport = 0;
-                System.out.println("Counting keys to import, please wait... (NOTE: to skip this use -n <num_keys>)");
-
-                parser.nextToken(); // START_ARRAY
-                while (parser.nextToken() != null)
-                {
-                    parser.skipChildren();
-                    if (parser.getCurrentToken() == JsonToken.END_ARRAY)
-                        break;
-
-                    keyCountToImport++;
-                }
-            }
-            System.out.printf("Importing %s keys...%n", keyCountToImport);
-        }
-
-        try (JsonParser parser = getParser(jsonFile); // renewing parser
-             SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(ssTablePath), keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE);)
-        {
-            int lineNumber = 1;
-            DecoratedKey prevStoredKey = null;
-
-            parser.nextToken(); // START_ARRAY
-            while (parser.nextToken() != null)
-            {
-                String key = parser.getCurrentName();
-                Map<?, ?> row = parser.readValueAs(new TypeReference<Map<?, ?>>(){});
-                DecoratedKey currentKey = partitioner.decorateKey(getKeyValidator(columnFamily).fromString((String) row.get("key")));
-
-                if (row.containsKey("metadata"))
-                    parseMeta((Map<?, ?>) row.get("metadata"), columnFamily, null);
-
-                addColumnsToCF((List<?>) row.get("cells"), columnFamily);
-
-                if (prevStoredKey != null && prevStoredKey.compareTo(currentKey) != -1)
-                {
-                    System.err
-                    .printf("Line %d: Key %s is greater than previous, collection is not sorted properly. Aborting import. You might need to delete SSTables manually.%n",
-                            lineNumber, key);
-                    return -1;
-                }
-
-                // saving decorated key
-                writer.append(currentKey, columnFamily);
-                columnFamily.clear();
-
-                prevStoredKey = currentKey;
-                importedKeys++;
-                lineNumber++;
-
-                long current = System.nanoTime();
-
-                if (TimeUnit.NANOSECONDS.toSeconds(current - start) >= 5) // 5 secs.
-                {
-                    System.out.printf("Currently imported %d keys.%n", importedKeys);
-                    start = current;
-                }
-
-                if (keyCountToImport == importedKeys)
-                    break;
-
-            }
-
-            writer.finish(true);
-
-            return importedKeys;
-        }
-    }
-
-    /**
-     * Get key validator for column family
-     * @param columnFamily column family instance
-     * @return key validator for given column family
-     */
-    private AbstractType<?> getKeyValidator(ColumnFamily columnFamily) {
-        // this is a fix to support backward compatibility
-        // which allows to skip the current key validator
-        // please, take a look onto CASSANDRA-7498 for more details
-        if ("true".equals(System.getProperty("skip.key.validator", "false"))) {
-            return BytesType.instance;
-        }
-        return columnFamily.metadata().getKeyValidator();
-    }
-
-    /**
-     * Get JsonParser object for file
-     * @param fileName name of the file
-     * @return json parser instance for given file
-     * @throws IOException if any I/O error.
-     */
-    private JsonParser getParser(String fileName) throws IOException
-    {
-        return factory.createJsonParser(new File(fileName));
-    }
-
-    /**
-     * Converts JSON to an SSTable file. JSON input can either be a file specified
-     * using an optional command line argument, or supplied on standard in.
-     *
-     * @param args command line arguments
-     * @throws ParseException on failure to parse JSON input
-     * @throws ConfigurationException on configuration error.
-     */
-    public static void main(String[] args) throws ParseException, ConfigurationException
-    {
-        System.err.println("WARNING: please note that json2sstable is now deprecated and will be removed in Cassandra 3.0. "
-                         + "You should use CQLSSTableWriter if you want to write sstables directly. "
-                         + "Please see https://issues.apache.org/jira/browse/CASSANDRA-9618 for details.");
-
-        CommandLineParser parser = new PosixParser();
-
-        try
-        {
-            cmd = parser.parse(options, args);
-        }
-        catch (org.apache.commons.cli.ParseException e)
-        {
-            System.err.println(e.getMessage());
-            printProgramUsage();
-            System.exit(1);
-        }
-
-        if (cmd.getArgs().length != 2)
-        {
-            printProgramUsage();
-            System.exit(1);
-        }
-
-        String json     = cmd.getArgs()[0];
-        String ssTable  = cmd.getArgs()[1];
-        String keyspace = cmd.getOptionValue(KEYSPACE_OPTION);
-        String cfamily  = cmd.getOptionValue(COLUMN_FAMILY_OPTION);
-
-        Integer keyCountToImport = null;
-        boolean isSorted = false;
-
-        if (cmd.hasOption(KEY_COUNT_OPTION))
-        {
-            keyCountToImport = Integer.valueOf(cmd.getOptionValue(KEY_COUNT_OPTION));
-        }
-
-        if (cmd.hasOption(IS_SORTED_OPTION))
-        {
-            isSorted = true;
-        }
-
-        Schema.instance.loadFromDisk(false);
-        if (Schema.instance.getNonSystemKeyspaces().size() < 1)
-        {
-            String msg = "no non-system keyspaces are defined";
-            System.err.println(msg);
-            throw new ConfigurationException(msg);
-        }
-
-        try
-        {
-            new SSTableImport(keyCountToImport, isSorted).importJson(json, keyspace, cfamily, ssTable);
-        }
-        catch (Exception e)
-        {
-            JVMStabilityInspector.inspectThrowable(e);
-            e.printStackTrace();
-            System.err.println("ERROR: " + e.getMessage());
-            System.exit(-1);
-        }
-
-        System.exit(0);
-    }
-
-    private static void printProgramUsage()
-    {
-        System.out.printf("Usage: %s -s -K <keyspace> -c <column_family> -n <num_keys> <json> <sstable>%n%n",
-                          SSTableImport.class.getName());
-
-        System.out.println("Options:");
-        for (Object o :  options.getOptions())
-        {
-            Option opt = (Option) o;
-            System.out.println("  -" +opt.getOpt() + " - " + opt.getDescription());
-        }
-    }
-
-    /**
-     * Convert a string to bytes (ByteBuffer) according to type
-     * @param content string to convert
-     * @param type type to use for conversion
-     * @return byte buffer representation of the given string
-     */
-    private static ByteBuffer stringAsType(String content, AbstractType<?> type)
-    {
-        try
-        {
-            return type.fromString(content);
-        }
-        catch (MarshalException e)
-        {
-            throw new RuntimeException(e.getMessage());
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
index f66269d..f4d6450 100644
--- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
+++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
@@ -25,9 +25,8 @@ import com.google.common.collect.ImmutableMap;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.db.CFRowAdder;
-import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
@@ -85,44 +84,36 @@ public final class TraceKeyspace
                                              String command,
                                              int ttl)
     {
-        Mutation mutation = new Mutation(NAME, sessionId);
-        ColumnFamily cells = mutation.addOrGet(TraceKeyspace.Sessions);
+        RowUpdateBuilder adder = new RowUpdateBuilder(Sessions, FBUtilities.timestampMicros(), ttl, sessionId)
+                                 .clustering()
+                                 .add("client", client)
+                                 .add("coordinator", FBUtilities.getBroadcastAddress())
+                                 .add("request", request)
+                                 .add("started_at", new Date(startedAt))
+                                 .add("command", command);
 
-        CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros(), ttl);
-        adder.add("client", client)
-             .add("coordinator", FBUtilities.getBroadcastAddress())
-             .add("request", request)
-             .add("started_at", new Date(startedAt))
-             .add("command", command);
         for (Map.Entry<String, String> entry : parameters.entrySet())
             adder.addMapEntry("parameters", entry.getKey(), entry.getValue());
-
-        return mutation;
+        return adder.build();
     }
 
     static Mutation makeStopSessionMutation(ByteBuffer sessionId, int elapsed, int ttl)
     {
-        Mutation mutation = new Mutation(NAME, sessionId);
-        ColumnFamily cells = mutation.addOrGet(Sessions);
-
-        CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros(), ttl);
-        adder.add("duration", elapsed);
-
-        return mutation;
+        return new RowUpdateBuilder(Sessions, FBUtilities.timestampMicros(), ttl, sessionId)
+               .clustering()
+               .add("duration", elapsed)
+               .build();
     }
 
     static Mutation makeEventMutation(ByteBuffer sessionId, String message, int elapsed, String threadName, int ttl)
     {
-        Mutation mutation = new Mutation(NAME, sessionId);
-        ColumnFamily cells = mutation.addOrGet(Events);
-
-        CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.make(UUIDGen.getTimeUUID()), FBUtilities.timestampMicros(), ttl);
-        adder.add("activity", message)
-             .add("source", FBUtilities.getBroadcastAddress())
-             .add("thread", threadName);
+        RowUpdateBuilder adder = new RowUpdateBuilder(Events, FBUtilities.timestampMicros(), ttl, sessionId)
+                                 .clustering(UUIDGen.getTimeUUID());
+        adder.add("activity", message);
+        adder.add("source", FBUtilities.getBroadcastAddress());
+        adder.add("thread", threadName);
         if (elapsed >= 0)
             adder.add("source_elapsed", elapsed);
-
-        return mutation;
+        return adder.build();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index 4e21678..4e54e46 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -141,6 +141,6 @@ public class QueryMessage extends Message.Request
     @Override
     public String toString()
     {
-        return "QUERY " + query;
+        return "QUERY " + query + "[pageSize = " + options.getPageSize() + "]";
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/triggers/ITrigger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/triggers/ITrigger.java b/src/java/org/apache/cassandra/triggers/ITrigger.java
index 21aba05..ad631d1 100644
--- a/src/java/org/apache/cassandra/triggers/ITrigger.java
+++ b/src/java/org/apache/cassandra/triggers/ITrigger.java
@@ -24,11 +24,11 @@ package org.apache.cassandra.triggers;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 
-import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.partitions.Partition;
 
 /**
- * Trigger interface, For every Mutation received by the coordinator {@link #augment(ByteBuffer, ColumnFamily)}
+ * Trigger interface, For every partition update received by the coordinator {@link #augment(Partition)}
  * is called.<p>
  *
  * <b> Contract:</b><br>
@@ -44,9 +44,8 @@ public interface ITrigger
     /**
      * Called exactly once per CF update, returned mutations are atomically updated.
      *
-     * @param partitionKey - partition Key for the update.
      * @param update - update received for the CF
      * @return additional modifications to be applied along with the supplied update
      */
-    public Collection<Mutation> augment(ByteBuffer partitionKey, ColumnFamily update);
+    public Collection<Mutation> augment(Partition update);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
index 973ad8b..071a973 100644
--- a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
+++ b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java
@@ -22,13 +22,16 @@ import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Maps;
 
 import org.apache.cassandra.config.TriggerDefinition;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.utils.FBUtilities;
@@ -62,7 +65,7 @@ public class TriggerExecutor
 
     /**
      * Augment a partition update by executing triggers to generate an intermediate
-     * set of mutations, then merging the ColumnFamily from each mutation with those
+     * set of mutations, then merging the update from each mutation with those
      * supplied. This is called from @{link org.apache.cassandra.service.StorageProxy#cas}
      * which is scoped for a single partition. For that reason, any mutations generated
      * by triggers are checked to ensure that they are for the same table and partition
@@ -77,22 +80,13 @@ public class TriggerExecutor
      * @throws InvalidRequestException if any mutation generated by a trigger does not
      * apply to the exact same partition as the initial update
      */
-    public ColumnFamily execute(ByteBuffer key, ColumnFamily updates) throws InvalidRequestException
+    public PartitionUpdate execute(PartitionUpdate updates) throws InvalidRequestException
     {
-        List<Mutation> intermediate = executeInternal(key, updates);
+        List<Mutation> intermediate = executeInternal(updates);
         if (intermediate == null || intermediate.isEmpty())
             return updates;
 
-        validateForSinglePartition(updates.metadata().getKeyValidator(), updates.id(), key, intermediate);
-
-        for (Mutation mutation : intermediate)
-        {
-            for (ColumnFamily cf : mutation.getColumnFamilies())
-            {
-                updates.addAll(cf);
-            }
-        }
-        return updates;
+        return PartitionUpdate.merge(validateForSinglePartition(updates.metadata().cfId, updates.partitionKey(), intermediate));
     }
 
     /**
@@ -120,9 +114,9 @@ public class TriggerExecutor
             if (mutation instanceof CounterMutation)
                 hasCounters = true;
 
-            for (ColumnFamily cf : mutation.getColumnFamilies())
+            for (PartitionUpdate upd : mutation.getPartitionUpdates())
             {
-                List<Mutation> augmentations = executeInternal(mutation.key(), cf);
+                List<Mutation> augmentations = executeInternal(upd);
                 if (augmentations == null || augmentations.isEmpty())
                     continue;
 
@@ -148,54 +142,66 @@ public class TriggerExecutor
 
     private Collection<Mutation> mergeMutations(Iterable<Mutation> mutations)
     {
-        Map<Pair<String, ByteBuffer>, Mutation> groupedMutations = new HashMap<>();
+        ListMultimap<Pair<String, ByteBuffer>, Mutation> groupedMutations = ArrayListMultimap.create();
 
         for (Mutation mutation : mutations)
         {
-            Pair<String, ByteBuffer> key = Pair.create(mutation.getKeyspaceName(), mutation.key());
-            Mutation current = groupedMutations.get(key);
-            if (current == null)
-            {
-                // copy in case the mutation's modifications map is backed by an immutable Collections#singletonMap().
-                groupedMutations.put(key, mutation.copy());
-            }
-            else
-            {
-                current.addAll(mutation);
-            }
+            Pair<String, ByteBuffer> key = Pair.create(mutation.getKeyspaceName(), mutation.key().getKey());
+            groupedMutations.put(key, mutation);
         }
 
-        return groupedMutations.values();
+        List<Mutation> merged = new ArrayList<>(groupedMutations.size());
+        for (Pair<String, ByteBuffer> key : groupedMutations.keySet())
+            merged.add(Mutation.merge(groupedMutations.get(key)));
+
+        return merged;
     }
 
-    private void validateForSinglePartition(AbstractType<?> keyValidator,
-                                            UUID cfId,
-                                            ByteBuffer key,
-                                            Collection<Mutation> tmutations)
+    private Collection<PartitionUpdate> validateForSinglePartition(UUID cfId,
+                                                                   DecoratedKey key,
+                                                                   Collection<Mutation> tmutations)
     throws InvalidRequestException
     {
-        for (Mutation mutation : tmutations)
+        validate(tmutations);
+
+        if (tmutations.size() == 1)
         {
-            if (keyValidator.compare(mutation.key(), key) != 0)
-                throw new InvalidRequestException("Partition key of additional mutation does not match primary update key");
+            Collection<PartitionUpdate> updates = Iterables.getOnlyElement(tmutations).getPartitionUpdates();
+            if (updates.size() > 1)
+                throw new InvalidRequestException("The updates generated by triggers are not all for the same partition");
+            validateSamePartition(cfId, key, Iterables.getOnlyElement(updates));
+            return updates;
+        }
 
-            for (ColumnFamily cf : mutation.getColumnFamilies())
+        ArrayList<PartitionUpdate> updates = new ArrayList<>(tmutations.size());
+        for (Mutation mutation : tmutations)
+        {
+            for (PartitionUpdate update : mutation.getPartitionUpdates())
             {
-                if (! cf.id().equals(cfId))
-                    throw new InvalidRequestException("table of additional mutation does not match primary update table");
+                validateSamePartition(cfId, key, update);
+                updates.add(update);
             }
         }
-        validate(tmutations);
+        return updates;
+    }
+
+    private void validateSamePartition(UUID cfId, DecoratedKey key, PartitionUpdate update)
+    throws InvalidRequestException
+    {
+        if (!key.equals(update.partitionKey()))
+            throw new InvalidRequestException("Partition key of additional mutation does not match primary update key");
+
+        if (!cfId.equals(update.metadata().cfId))
+            throw new InvalidRequestException("table of additional mutation does not match primary update table");
     }
 
     private void validate(Collection<Mutation> tmutations) throws InvalidRequestException
     {
         for (Mutation mutation : tmutations)
         {
-            QueryProcessor.validateKey(mutation.key());
-            for (ColumnFamily tcf : mutation.getColumnFamilies())
-                for (Cell cell : tcf)
-                    cell.validateFields(tcf.metadata());
+            QueryProcessor.validateKey(mutation.key().getKey());
+            for (PartitionUpdate update : mutation.getPartitionUpdates())
+                update.validate();
         }
     }
 
@@ -203,9 +209,9 @@ public class TriggerExecutor
      * Switch class loader before using the triggers for the column family, if
      * not loaded them with the custom class loader.
      */
-    private List<Mutation> executeInternal(ByteBuffer key, ColumnFamily columnFamily)
+    private List<Mutation> executeInternal(PartitionUpdate update)
     {
-        Map<String, TriggerDefinition> triggers = columnFamily.metadata().getTriggers();
+        Map<String, TriggerDefinition> triggers = update.metadata().getTriggers();
         if (triggers.isEmpty())
             return null;
         List<Mutation> tmutations = Lists.newLinkedList();
@@ -220,7 +226,7 @@ public class TriggerExecutor
                     trigger = loadTriggerInstance(td.classOption);
                     cachedTriggers.put(td.classOption, trigger);
                 }
-                Collection<Mutation> temp = trigger.augment(key, columnFamily);
+                Collection<Mutation> temp = trigger.augment(update);
                 if (temp != null)
                     tmutations.addAll(temp);
             }
@@ -228,7 +234,7 @@ public class TriggerExecutor
         }
         catch (Exception ex)
         {
-            throw new RuntimeException(String.format("Exception while creating trigger on table with ID: %s", columnFamily.id()), ex);
+            throw new RuntimeException(String.format("Exception while creating trigger on table with ID: %s", update.metadata().cfId), ex);
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
index 1831c19..69915bf 100644
--- a/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
+++ b/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
@@ -33,6 +33,7 @@ import java.util.Arrays;
 import java.util.UUID;
 
 import net.nicoulaj.compilecommand.annotations.Inline;
+import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileUtils;
@@ -322,6 +323,12 @@ public class ByteBufferUtil
         return ByteBufferUtil.read(in, length);
     }
 
+    public static int serializedSizeWithLength(ByteBuffer buffer, TypeSizes sizes)
+    {
+        int size = buffer.remaining();
+        return sizes.sizeof(size) + size;
+    }
+
     /* @return An unsigned short in an integer. */
     public static int readShortLength(DataInput in) throws IOException
     {
@@ -338,16 +345,21 @@ public class ByteBufferUtil
         return ByteBufferUtil.read(in, readShortLength(in));
     }
 
+    public static int serializedSizeWithShortLength(ByteBuffer buffer, TypeSizes sizes)
+    {
+        int size = buffer.remaining();
+        return sizes.sizeof((short)size) + size;
+    }
+
     /**
      * @param in data input
      * @return null
      * @throws IOException if an I/O error occurs.
      */
-    public static ByteBuffer skipShortLength(DataInput in) throws IOException
+    public static void skipShortLength(DataInput in) throws IOException
     {
         int skip = readShortLength(in);
         FileUtils.skipBytesFully(in, skip);
-        return null;
     }
 
     public static ByteBuffer read(DataInput in, int length) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 17edeb0..b0bed7b 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -372,6 +372,11 @@ public class FBUtilities
         return System.currentTimeMillis() * 1000;
     }
 
+    public static int nowInSeconds()
+    {
+        return (int)(System.currentTimeMillis() / 1000);
+    }
+
     public static void waitOnFutures(Iterable<Future<?>> futures)
     {
         for (Future f : futures)
@@ -502,13 +507,18 @@ public class FBUtilities
         }
     }
 
-    public static <T> SortedSet<T> singleton(T column, Comparator<? super T> comparator)
+    public static <T> NavigableSet<T> singleton(T column, Comparator<? super T> comparator)
     {
-        SortedSet<T> s = new TreeSet<T>(comparator);
+        NavigableSet<T> s = new TreeSet<T>(comparator);
         s.add(column);
         return s;
     }
 
+    public static <T> NavigableSet<T> emptySortedSet(Comparator<? super T> comparator)
+    {
+        return new TreeSet<T>(comparator);
+    }
+
     public static String toString(Map<?,?> map)
     {
         Joiner.MapJoiner joiner = Joiner.on(", ").withKeyValueSeparator(":");
@@ -796,4 +806,30 @@ public class FBUtilities
         digest.update((byte) ((val >>>  8) & 0xFF));
         digest.update((byte)  ((val >>> 0) & 0xFF));
     }
+
+    public static void updateWithBoolean(MessageDigest digest, boolean val)
+    {
+        updateWithByte(digest, val ? 0 : 1);
+    }
+
+    public static void closeAll(List<? extends AutoCloseable> l) throws Exception
+    {
+        Exception toThrow = null;
+        for (AutoCloseable c : l)
+        {
+            try
+            {
+                c.close();
+            }
+            catch (Exception e)
+            {
+                if (toThrow == null)
+                    toThrow = e;
+                else
+                    toThrow.addSuppressed(e);
+            }
+        }
+        if (toThrow != null)
+            throw toThrow;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/MergeIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/MergeIterator.java b/src/java/org/apache/cassandra/utils/MergeIterator.java
index e61326e..d0f116e 100644
--- a/src/java/org/apache/cassandra/utils/MergeIterator.java
+++ b/src/java/org/apache/cassandra/utils/MergeIterator.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.utils;
 
 import java.io.Closeable;
-import java.io.IOException;
 import java.util.*;
 
 import com.google.common.collect.AbstractIterator;
@@ -35,9 +34,9 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem
         this.reducer = reducer;
     }
 
-    public static <In, Out> IMergeIterator<In, Out> get(List<? extends Iterator<In>> sources,
-                                                        Comparator<In> comparator,
-                                                        Reducer<In, Out> reducer)
+    public static <In, Out> MergeIterator<In, Out> get(List<? extends Iterator<In>> sources,
+                                                       Comparator<? super In> comparator,
+                                                       Reducer<In, Out> reducer)
     {
         if (sources.size() == 1)
         {
@@ -59,9 +58,10 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem
         {
             try
             {
-                ((Closeable)iterator).close();
+                if (iterator instanceof AutoCloseable)
+                    ((AutoCloseable)iterator).close();
             }
-            catch (IOException e)
+            catch (Exception e)
             {
                 throw new RuntimeException(e);
             }
@@ -79,13 +79,13 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem
         // TODO: if we had our own PriorityQueue implementation we could stash items
         // at the end of its array, so we wouldn't need this storage
         protected final ArrayDeque<Candidate<In>> candidates;
-        public ManyToOne(List<? extends Iterator<In>> iters, Comparator<In> comp, Reducer<In, Out> reducer)
+        public ManyToOne(List<? extends Iterator<In>> iters, Comparator<? super In> comp, Reducer<In, Out> reducer)
         {
             super(iters, reducer);
             this.queue = new PriorityQueue<>(Math.max(1, iters.size()));
-            for (Iterator<In> iter : iters)
+            for (int i = 0; i < iters.size(); i++)
             {
-                Candidate<In> candidate = new Candidate<>(iter, comp);
+                Candidate<In> candidate = new Candidate<>(i, iters.get(i), comp);
                 if (!candidate.advance())
                     // was empty
                     continue;
@@ -111,7 +111,7 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem
             {
                 candidate = queue.poll();
                 candidates.push(candidate);
-                reducer.reduce(candidate.item);
+                reducer.reduce(candidate.idx, candidate.item);
             }
             while (queue.peek() != null && queue.peek().compareTo(candidate) == 0);
             return reducer.getReduced();
@@ -130,14 +130,16 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem
     // Holds and is comparable by the head item of an iterator it owns
     protected static final class Candidate<In> implements Comparable<Candidate<In>>
     {
-        private final Iterator<In> iter;
-        private final Comparator<In> comp;
+        private final Iterator<? extends In> iter;
+        private final Comparator<? super In> comp;
+        private final int idx;
         private In item;
 
-        public Candidate(Iterator<In> iter, Comparator<In> comp)
+        public Candidate(int idx, Iterator<? extends In> iter, Comparator<? super In> comp)
         {
             this.iter = iter;
             this.comp = comp;
+            this.idx = idx;
         }
 
         /** @return True if our iterator had an item, and it is now available */
@@ -170,7 +172,7 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem
          * combine this object with the previous ones.
          * intermediate state is up to your implementation.
          */
-        public abstract void reduce(In current);
+        public abstract void reduce(int idx, In current);
 
         /** @return The last object computed by reduce */
         protected abstract Out getReduced();
@@ -202,7 +204,7 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem
             if (!source.hasNext())
                 return endOfData();
             reducer.onKeyChange();
-            reducer.reduce(source.next());
+            reducer.reduce(0, source.next());
             return reducer.getReduced();
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
index 5448390..f2663bf 100644
--- a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
+++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
@@ -21,13 +21,13 @@ import java.net.InetAddress;
 import java.util.*;
 
 import com.datastax.driver.core.*;
+
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.CompactTables;
 import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.CellNames;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableLoader;
@@ -100,27 +100,70 @@ public class NativeSSTableLoaderClient extends SSTableLoader.Client
     {
         Map<String, CFMetaData> tables = new HashMap<>();
 
-        String query = String.format("SELECT columnfamily_name, cf_id, type, comparator, subcomparator, is_dense FROM %s.%s WHERE keyspace_name = '%s'",
+        String query = String.format("SELECT columnfamily_name, cf_id, type, comparator, subcomparator, is_dense, default_validator FROM %s.%s WHERE keyspace_name = '%s'",
                                      SystemKeyspace.NAME,
                                      LegacySchemaTables.COLUMNFAMILIES,
                                      keyspace);
 
+
+        // The following is a slightly simplified but otherwise duplicated version of LegacySchemaTables.createTableFromTableRowAndColumnRows. It might
+        // be safer to have a simple wrapper of the driver ResultSet/Row implementing UntypedResultSet/UntypedResultSet.Row and reuse the original method.
         for (Row row : session.execute(query))
         {
             String name = row.getString("columnfamily_name");
             UUID id = row.getUUID("cf_id");
-            ColumnFamilyType type = ColumnFamilyType.valueOf(row.getString("type"));
+            boolean isSuper = row.getString("type").toLowerCase().equals("super");
             AbstractType rawComparator = TypeParser.parse(row.getString("comparator"));
             AbstractType subComparator = row.isNull("subcomparator")
                                        ? null
                                        : TypeParser.parse(row.getString("subcomparator"));
             boolean isDense = row.getBool("is_dense");
-            CellNameType comparator = CellNames.fromAbstractType(CFMetaData.makeRawAbstractType(rawComparator, subComparator),
-                                                                 isDense);
+            boolean isCompound = rawComparator instanceof CompositeType;
+
+            AbstractType<?> defaultValidator = TypeParser.parse(row.getString("default_validator"));
+            boolean isCounter =  defaultValidator instanceof CounterColumnType;
+            boolean isCQLTable = !isSuper && !isDense && isCompound;
 
-            tables.put(name, new CFMetaData(keyspace, name, type, comparator, id));
+            String columnsQuery = String.format("SELECT column_name, component_index, type, validator FROM %s.%s WHERE keyspace_name='%s' AND columnfamily_name='%s'",
+                                                SystemKeyspace.NAME,
+                                                LegacySchemaTables.COLUMNS,
+                                                keyspace,
+                                                name);
+
+            List<ColumnDefinition> defs = new ArrayList<>();
+            for (Row colRow : session.execute(columnsQuery))
+                defs.add(createDefinitionFromRow(colRow, keyspace, name, rawComparator, subComparator, isSuper, isCQLTable));
+
+            tables.put(name, CFMetaData.create(keyspace, name, id, isDense, isCompound, isSuper, isCounter, defs));
         }
 
         return tables;
     }
+
+    // A slightly simplified version of LegacySchemaTables.
+    private static ColumnDefinition createDefinitionFromRow(Row row,
+                                                            String keyspace,
+                                                            String table,
+                                                            AbstractType<?> rawComparator,
+                                                            AbstractType<?> rawSubComparator,
+                                                            boolean isSuper,
+                                                            boolean isCQLTable)
+    {
+        ColumnDefinition.Kind kind = LegacySchemaTables.deserializeKind(row.getString("type"));
+
+        Integer componentIndex = null;
+        if (!row.isNull("component_index"))
+            componentIndex = row.getInt("component_index");
+
+        // Note: we save the column name as string, but we should not assume that it is an UTF8 name, we
+        // we need to use the comparator fromString method
+        AbstractType<?> comparator = isCQLTable
+                                   ? UTF8Type.instance
+                                   : CompactTables.columnDefinitionComparator(kind, isSuper, rawComparator, rawSubComparator);
+        ColumnIdentifier name = ColumnIdentifier.getInterned(comparator.fromString(row.getString("column_name")), comparator);
+
+        AbstractType<?> validator = TypeParser.parse(row.getString("validator"));
+
+        return new ColumnDefinition(keyspace, table, name, validator, null, null, null, componentIndex, kind);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/Sorting.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Sorting.java b/src/java/org/apache/cassandra/utils/Sorting.java
new file mode 100644
index 0000000..b1c0b46
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/Sorting.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+public abstract class Sorting
+{
+    private Sorting() {}
+
+    /**
+     * Interface that allows to sort elements addressable by index, but without actually requiring those
+     * to elements to be part of a list/array.
+     */
+    public interface Sortable
+    {
+        /**
+         * The number of elements to sort.
+         */
+        public int size();
+
+        /**
+         * Compares the element with index i should sort before the element with index j.
+         */
+        public int compare(int i, int j);
+
+        /**
+         * Swaps element i and j.
+         */
+        public void swap(int i, int j);
+    }
+
+    /**
+     * Sort a sortable.
+     *
+     * The actual algorithm is a direct adaptation of the standard sorting in golang
+     * at http://golang.org/src/pkg/sort/sort.go (comments included).
+     *
+     * It makes one call to data.Len to determine n, and O(n*log(n)) calls to
+     * data.Less and data.Swap. The sort is not guaranteed to be stable.
+     */
+    public static void sort(Sortable data)
+    {
+        // Switch to heapsort if depth of 2*ceil(lg(n+1)) is reached.
+        int n = data.size();
+        int maxDepth = 0;
+        for (int i = n; i > 0; i >>= 1)
+            maxDepth++;
+        maxDepth *= 2;
+        quickSort(data, 0, n, maxDepth);
+    }
+
+    private static void insertionSort(Sortable data, int a, int b)
+    {
+        for (int i = a + 1; i < b; i++)
+            for(int j = i; j > a && data.compare(j, j-1) < 0; j--)
+                data.swap(j, j-1);
+    }
+
+    // siftDown implements the heap property on data[lo, hi).
+    // first is an offset into the array where the root of the heap lies.
+    private static void siftDown(Sortable data, int lo, int hi, int first)
+    {
+        int root = lo;
+        while (true)
+        {
+            int child = 2*root + 1;
+            if (child >= hi)
+                return;
+
+            if (child + 1 < hi && data.compare(first+child, first+child+1) < 0)
+                child++;
+
+            if (data.compare(first+root, first+child) >= 0)
+                return;
+
+            data.swap(first+root, first+child);
+            root = child;
+        }
+    }
+
+    private static void heapSort(Sortable data, int a, int b)
+    {
+        int first = a;
+        int lo = 0;
+        int hi = b - a;
+
+        // Build heap with greatest element at top.
+        for (int i = (hi - 1) / 2; i >= 0; i--)
+            siftDown(data, i, hi, first);
+
+        // Pop elements, largest first, into end of data.
+        for (int i = hi - 1; i >= 0; i--) {
+            data.swap(first, first+i);
+            siftDown(data, lo, i, first);
+        }
+    }
+
+    // Quicksort, following Bentley and McIlroy,
+    // ``Engineering a Sort Function,'' SP&E November 1993.
+
+    // medianOfThree moves the median of the three values data[a], data[b], data[c] into data[a].
+    private static void medianOfThree(Sortable data, int a, int b, int c)
+    {
+        int m0 = b;
+        int m1 = a;
+        int m2 = c;
+        // bubble sort on 3 elements
+        if (data.compare(m1, m0) < 0)
+            data.swap(m1, m0);
+        if (data.compare(m2, m1) < 0)
+            data.swap(m2, m1);
+        if (data.compare(m1, m0) < 0)
+            data.swap(m1, m0);
+        // now data[m0] <= data[m1] <= data[m2]
+    }
+
+    private static void swapRange(Sortable data, int a, int b, int n)
+    {
+        for (int i = 0; i < n; i++)
+            data.swap(a+i, b+i);
+    }
+
+    private static void doPivot(Sortable data, int lo, int hi, int[] result)
+    {
+        int m = lo + (hi-lo)/2; // Written like this to avoid integer overflow.
+        if (hi-lo > 40) {
+            // Tukey's ``Ninther,'' median of three medians of three.
+            int s = (hi - lo) / 8;
+            medianOfThree(data, lo, lo+s, lo+2*s);
+            medianOfThree(data, m, m-s, m+s);
+            medianOfThree(data, hi-1, hi-1-s, hi-1-2*s);
+        }
+        medianOfThree(data, lo, m, hi-1);
+
+        // Invariants are:
+        //    data[lo] = pivot (set up by ChoosePivot)
+        //    data[lo <= i < a] = pivot
+        //    data[a <= i < b] < pivot
+        //    data[b <= i < c] is unexamined
+        //    data[c <= i < d] > pivot
+        //    data[d <= i < hi] = pivot
+        //
+        // Once b meets c, can swap the "= pivot" sections
+        // into the middle of the slice.
+        int pivot = lo;
+        int a = lo+1, b = lo+1, c = hi, d =hi;
+        while (true)
+        {
+            while (b < c)
+            {
+                int cmp = data.compare(b, pivot);
+                if (cmp < 0)  // data[b] < pivot
+                {
+                    b++;
+                }
+                else if (cmp == 0) // data[b] = pivot
+                {
+                    data.swap(a, b);
+                    a++;
+                    b++;
+                }
+                else
+                {
+                    break;
+                }
+            }
+
+            while (b < c)
+            {
+                int cmp = data.compare(pivot, c-1);
+                if (cmp < 0) // data[c-1] > pivot
+                {
+                    c--;
+                }
+                else if (cmp == 0) // data[c-1] = pivot
+                {
+                    data.swap(c-1, d-1);
+                    c--;
+                    d--;
+                }
+                else
+                {
+                    break;
+                }
+            }
+
+            if (b >= c)
+                break;
+
+            // data[b] > pivot; data[c-1] < pivot
+            data.swap(b, c-1);
+            b++;
+            c--;
+        }
+
+        int n = Math.min(b-a, a-lo);
+        swapRange(data, lo, b-n, n);
+
+        n = Math.min(hi-d, d-c);
+        swapRange(data, c, hi-n, n);
+
+        result[0] = lo + b - a;
+        result[1] = hi - (d - c);
+    }
+
+    private static void quickSort(Sortable data, int a, int b, int maxDepth)
+    {
+        int[] buffer = new int[2];
+
+        while (b-a > 7)
+        {
+            if (maxDepth == 0)
+            {
+                heapSort(data, a, b);
+                return;
+            }
+
+            maxDepth--;
+
+            doPivot(data, a, b, buffer);
+            int mlo = buffer[0];
+            int mhi = buffer[1];
+            // Avoiding recursion on the larger subproblem guarantees
+            // a stack depth of at most lg(b-a).
+            if (mlo-a < b-mhi)
+            {
+                quickSort(data, a, mlo, maxDepth);
+                a = mhi; // i.e., quickSort(data, mhi, b)
+            }
+            else
+            {
+                quickSort(data, mhi, b, maxDepth);
+                b = mlo; // i.e., quickSort(data, a, mlo)
+            }
+        }
+
+        if (b-a > 1)
+            insertionSort(data, a, b);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/btree/BTree.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java b/src/java/org/apache/cassandra/utils/btree/BTree.java
index 1145d12..bf68ffa 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTree.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTree.java
@@ -26,8 +26,6 @@ import java.util.Queue;
 
 import org.apache.cassandra.utils.ObjectSizes;
 
-import static org.apache.cassandra.utils.btree.UpdateFunction.NoOp;
-
 public class BTree
 {
     /**
@@ -79,21 +77,18 @@ public class BTree
         return EMPTY_LEAF;
     }
 
-    public static <V> Object[] build(Collection<V> source, Comparator<V> comparator, boolean sorted, UpdateFunction<V> updateF)
+    public static <C, K extends C, V extends C> Object[] build(Collection<K> source, UpdateFunction<K, V> updateF)
     {
-        return build(source, source.size(), comparator, sorted, updateF);
+        return build(source, source.size(), updateF);
     }
 
     /**
      * Creates a BTree containing all of the objects in the provided collection
      *
      * @param source     the items to build the tree with
-     * @param comparator the comparator that defines the ordering over the items in the tree
-     * @param sorted     if false, the collection will be copied and sorted to facilitate construction
-     * @param <V>
      * @return
      */
-    public static <V> Object[] build(Iterable<V> source, int size, Comparator<V> comparator, boolean sorted, UpdateFunction<V> updateF)
+    public static <C, K extends C, V extends C> Object[] build(Iterable<K> source, int size, UpdateFunction<K, V> updateF)
     {
         if (size < FAN_FACTOR)
         {
@@ -101,27 +96,13 @@ public class BTree
             V[] values = (V[]) new Object[size + (size & 1)];
             {
                 int i = 0;
-                for (V v : source)
-                    values[i++] = v;
-            }
-
-            // inline sorting since we're already calling toArray
-            if (!sorted)
-                Arrays.sort(values, 0, size, comparator);
-
-            // if updateF is specified
-            if (updateF != null)
-            {
-                for (int i = 0 ; i < size ; i++)
-                    values[i] = updateF.apply(values[i]);
-                updateF.allocated(ObjectSizes.sizeOfArray(values));
+                for (K k : source)
+                    values[i++] = updateF.apply(k);
             }
+            updateF.allocated(ObjectSizes.sizeOfArray(values));
             return values;
         }
 
-        if (!sorted)
-            source = sorted(source, comparator, size);
-
         Queue<Builder> queue = modifier.get();
         Builder builder = queue.poll();
         if (builder == null)
@@ -131,28 +112,12 @@ public class BTree
         return btree;
     }
 
-    /**
-     * Returns a new BTree with the provided set inserting/replacing as necessary any equal items
-     *
-     * @param btree              the tree to update
-     * @param comparator         the comparator that defines the ordering over the items in the tree
-     * @param updateWith         the items to either insert / update
-     * @param updateWithIsSorted if false, updateWith will be copied and sorted to facilitate construction
-     * @param <V>
-     * @return
-     */
-    public static <V> Object[] update(Object[] btree, Comparator<V> comparator, Collection<V> updateWith, boolean updateWithIsSorted)
-    {
-        return update(btree, comparator, updateWith, updateWithIsSorted, NoOp.<V>instance());
-    }
-
-    public static <V> Object[] update(Object[] btree,
-                                      Comparator<V> comparator,
-                                      Collection<V> updateWith,
-                                      boolean updateWithIsSorted,
-                                      UpdateFunction<V> updateF)
+    public static <C, K extends C, V extends C> Object[] update(Object[] btree,
+                                                                Comparator<C> comparator,
+                                                                Collection<K> updateWith,
+                                                                UpdateFunction<K, V> updateF)
     {
-        return update(btree, comparator, updateWith, updateWith.size(), updateWithIsSorted, updateF);
+        return update(btree, comparator, updateWith, updateWith.size(), updateF);
     }
 
     /**
@@ -161,23 +126,19 @@ public class BTree
      * @param btree              the tree to update
      * @param comparator         the comparator that defines the ordering over the items in the tree
      * @param updateWith         the items to either insert / update
-     * @param updateWithIsSorted if false, updateWith will be copied and sorted to facilitate construction
+     * @param updateWithLength   then number of elements in updateWith
      * @param updateF            the update function to apply to any pairs we are swapping, and maybe abort early
      * @param <V>
      * @return
      */
-    public static <V> Object[] update(Object[] btree,
-                                      Comparator<V> comparator,
-                                      Iterable<V> updateWith,
-                                      int updateWithLength,
-                                      boolean updateWithIsSorted,
-                                      UpdateFunction<V> updateF)
+    public static <C, K extends C, V extends C> Object[] update(Object[] btree,
+                                                                Comparator<C> comparator,
+                                                                Iterable<K> updateWith,
+                                                                int updateWithLength,
+                                                                UpdateFunction<K, V> updateF)
     {
         if (btree.length == 0)
-            return build(updateWith, updateWithLength, comparator, updateWithIsSorted, updateF);
-
-        if (!updateWithIsSorted)
-            updateWith = sorted(updateWith, comparator, updateWithLength);
+            return build(updateWith, updateWithLength, updateF);
 
         Queue<Builder> queue = modifier.get();
         Builder builder = queue.poll();
@@ -361,17 +322,6 @@ public class BTree
         }
     };
 
-    // return a sorted collection
-    private static <V> Collection<V> sorted(Iterable<V> source, Comparator<V> comparator, int size)
-    {
-        V[] vs = (V[]) new Object[size];
-        int i = 0;
-        for (V v : source)
-            vs[i++] = v;
-        Arrays.sort(vs, comparator);
-        return Arrays.asList(vs);
-    }
-
     /** simple static wrapper to calls to cmp.compare() which checks if either a or b are Special (i.e. represent an infinity) */
     // TODO : cheaper to check for POSITIVE/NEGATIVE infinity in callers, rather than here
     static <V> int compare(Comparator<V> cmp, Object a, Object b)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java b/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
index 7a83238..403f234 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
@@ -25,43 +25,76 @@ import static org.apache.cassandra.utils.btree.BTree.getKeyEnd;
 
 public class BTreeSearchIterator<CK, K extends CK, V> extends Path implements SearchIterator<K, V>
 {
-
     final Comparator<CK> comparator;
-    public BTreeSearchIterator(Object[] btree, Comparator<CK> comparator)
+    final boolean forwards;
+
+    public BTreeSearchIterator(Object[] btree, Comparator<CK> comparator, boolean forwards)
     {
         init(btree);
+        if (!forwards)
+            this.indexes[0] = (byte)(getKeyEnd(path[0]) - 1);
         this.comparator = comparator;
+        this.forwards = forwards;
     }
 
     public V next(K target)
     {
-        while (depth > 0)
+        // We could probably avoid some of the repetition but leaving that for later.
+        if (forwards)
         {
-            byte successorParentDepth = findSuccessorParentDepth();
-            if (successorParentDepth < 0)
-                break; // we're in last section of tree, so can only search down
-            int successorParentIndex = indexes[successorParentDepth] + 1;
-            Object[] successParentNode = path[successorParentDepth];
-            Object successorParentKey = successParentNode[successorParentIndex];
-            int c = BTree.compare(comparator, target, successorParentKey);
-            if (c < 0)
-                break;
-            if (c == 0)
+            while (depth > 0)
             {
+                byte successorParentDepth = findSuccessorParentDepth();
+                if (successorParentDepth < 0)
+                    break; // we're in last section of tree, so can only search down
+                int successorParentIndex = indexes[successorParentDepth] + 1;
+                Object[] successParentNode = path[successorParentDepth];
+                Object successorParentKey = successParentNode[successorParentIndex];
+                int c = BTree.compare(comparator, target, successorParentKey);
+                if (c < 0)
+                    break;
+                if (c == 0)
+                {
+                    depth = successorParentDepth;
+                    indexes[successorParentDepth]++;
+                    return (V) successorParentKey;
+                }
                 depth = successorParentDepth;
                 indexes[successorParentDepth]++;
-                return (V) successorParentKey;
             }
-            depth = successorParentDepth;
-            indexes[successorParentDepth]++;
+            if (find(comparator, target, Op.CEIL, true))
+                return (V) currentKey();
+        }
+        else
+        {
+            while (depth > 0)
+            {
+                byte predecessorParentDepth = findPredecessorParentDepth();
+                if (predecessorParentDepth < 0)
+                    break; // we're in last section of tree, so can only search down
+                int predecessorParentIndex = indexes[predecessorParentDepth] - 1;
+                Object[] predecessParentNode = path[predecessorParentDepth];
+                Object predecessorParentKey = predecessParentNode[predecessorParentIndex];
+                int c = BTree.compare(comparator, target, predecessorParentKey);
+                if (c > 0)
+                    break;
+                if (c == 0)
+                {
+                    depth = predecessorParentDepth;
+                    indexes[predecessorParentDepth]--;
+                    return (V) predecessorParentKey;
+                }
+                depth = predecessorParentDepth;
+                indexes[predecessorParentDepth]--;
+            }
+            if (find(comparator, target, Op.FLOOR, false))
+                return (V) currentKey();
         }
-        if (find(comparator, target, Op.CEIL, true))
-            return (V) currentKey();
         return null;
     }
 
     public boolean hasNext()
     {
-        return depth != 0 || indexes[0] != getKeyEnd(path[0]);
+        return depth != 0 || indexes[0] != (forwards ? getKeyEnd(path[0]) : -1);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeSet.java b/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
deleted file mode 100644
index d80b32e..0000000
--- a/src/java/org/apache/cassandra/utils/btree/BTreeSet.java
+++ /dev/null
@@ -1,383 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.utils.btree;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.NavigableSet;
-import java.util.SortedSet;
-
-public class BTreeSet<V> implements NavigableSet<V>
-{
-    protected final Comparator<V> comparator;
-    protected final Object[] tree;
-
-    public BTreeSet(Object[] tree, Comparator<V> comparator)
-    {
-        this.tree = tree;
-        this.comparator = comparator;
-    }
-
-    public BTreeSet<V> update(Collection<V> updateWith, boolean isSorted)
-    {
-        return new BTreeSet<>(BTree.update(tree, comparator, updateWith, isSorted, UpdateFunction.NoOp.<V>instance()), comparator);
-    }
-
-    @Override
-    public Comparator<? super V> comparator()
-    {
-        return comparator;
-    }
-
-    protected Cursor<V, V> slice(boolean forwards, boolean permitInversion)
-    {
-        return BTree.slice(tree, forwards);
-    }
-
-    @Override
-    public int size()
-    {
-        return slice(true, false).count();
-    }
-
-    @Override
-    public boolean isEmpty()
-    {
-        return slice(true, false).hasNext();
-    }
-
-    @Override
-    public Iterator<V> iterator()
-    {
-        return slice(true, true);
-    }
-
-    @Override
-    public Iterator<V> descendingIterator()
-    {
-        return slice(false, true);
-    }
-
-    @Override
-    public Object[] toArray()
-    {
-        return toArray(new Object[0]);
-    }
-
-    @Override
-    public <T> T[] toArray(T[] a)
-    {
-        int size = size();
-        if (a.length < size)
-            a = Arrays.copyOf(a, size);
-        int i = 0;
-        for (V v : this)
-            a[i++] = (T) v;
-        return a;
-    }
-
-    @Override
-    public NavigableSet<V> subSet(V fromElement, boolean fromInclusive, V toElement, boolean toInclusive)
-    {
-        return new BTreeRange<>(tree, comparator, fromElement, fromInclusive, toElement, toInclusive);
-    }
-
-    @Override
-    public NavigableSet<V> headSet(V toElement, boolean inclusive)
-    {
-        return new BTreeRange<>(tree, comparator, null, true, toElement, inclusive);
-    }
-
-    @Override
-    public NavigableSet<V> tailSet(V fromElement, boolean inclusive)
-    {
-        return new BTreeRange<>(tree, comparator, fromElement, inclusive, null, true);
-    }
-
-    @Override
-    public SortedSet<V> subSet(V fromElement, V toElement)
-    {
-        return subSet(fromElement, true, toElement, false);
-    }
-
-    @Override
-    public SortedSet<V> headSet(V toElement)
-    {
-        return headSet(toElement, false);
-    }
-
-    @Override
-    public SortedSet<V> tailSet(V fromElement)
-    {
-        return tailSet(fromElement, true);
-    }
-
-    @Override
-    public V first()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public V last()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean addAll(Collection<? extends V> c)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean retainAll(Collection<?> c)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean removeAll(Collection<?> c)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void clear()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public V pollFirst()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public V pollLast()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean add(V v)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean remove(Object o)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public V lower(V v)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public V floor(V v)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public V ceiling(V v)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public V higher(V v)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean contains(Object o)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public boolean containsAll(Collection<?> c)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public NavigableSet<V> descendingSet()
-    {
-        return new BTreeRange<>(this.tree, this.comparator).descendingSet();
-    }
-
-    public static class BTreeRange<V> extends BTreeSet<V> implements NavigableSet<V>
-    {
-
-        protected final V lowerBound, upperBound;
-        protected final boolean inclusiveLowerBound, inclusiveUpperBound;
-
-        BTreeRange(Object[] tree, Comparator<V> comparator)
-        {
-            this(tree, comparator, null, true, null, true);
-        }
-
-        BTreeRange(BTreeRange<V> from)
-        {
-            this(from.tree, from.comparator, from.lowerBound, from.inclusiveLowerBound, from.upperBound, from.inclusiveUpperBound);
-        }
-
-        BTreeRange(Object[] tree, Comparator<V> comparator, V lowerBound, boolean inclusiveLowerBound, V upperBound, boolean inclusiveUpperBound)
-        {
-            super(tree, comparator);
-            this.lowerBound = lowerBound;
-            this.upperBound = upperBound;
-            this.inclusiveLowerBound = inclusiveLowerBound;
-            this.inclusiveUpperBound = inclusiveUpperBound;
-        }
-
-        // narrowing range constructor - makes this the intersection of the two ranges over the same tree b
-        BTreeRange(BTreeRange<V> a, BTreeRange<V> b)
-        {
-            super(a.tree, a.comparator);
-            assert a.tree == b.tree;
-            final BTreeRange<V> lb, ub;
-
-            if (a.lowerBound == null)
-            {
-                lb = b;
-            }
-            else if (b.lowerBound == null)
-            {
-                lb = a;
-            }
-            else
-            {
-                int c = comparator.compare(a.lowerBound, b.lowerBound);
-                if (c < 0)
-                    lb = b;
-                else if (c > 0)
-                    lb = a;
-                else if (!a.inclusiveLowerBound)
-                    lb = a;
-                else
-                    lb = b;
-            }
-
-            if (a.upperBound == null)
-            {
-                ub = b;
-            }
-            else if (b.upperBound == null)
-            {
-                ub = a;
-            }
-            else
-            {
-                int c = comparator.compare(b.upperBound, a.upperBound);
-                if (c < 0)
-                    ub = b;
-                else if (c > 0)
-                    ub = a;
-                else if (!a.inclusiveUpperBound)
-                    ub = a;
-                else
-                    ub = b;
-            }
-
-            lowerBound = lb.lowerBound;
-            inclusiveLowerBound = lb.inclusiveLowerBound;
-            upperBound = ub.upperBound;
-            inclusiveUpperBound = ub.inclusiveUpperBound;
-        }
-
-        @Override
-        protected Cursor<V, V> slice(boolean forwards, boolean permitInversion)
-        {
-            return BTree.slice(tree, comparator, lowerBound, inclusiveLowerBound, upperBound, inclusiveUpperBound, forwards);
-        }
-
-        @Override
-        public NavigableSet<V> subSet(V fromElement, boolean fromInclusive, V toElement, boolean toInclusive)
-        {
-            return new BTreeRange<>(this, new BTreeRange<>(tree, comparator, fromElement, fromInclusive, toElement, toInclusive));
-        }
-
-        @Override
-        public NavigableSet<V> headSet(V toElement, boolean inclusive)
-        {
-            return new BTreeRange<>(this, new BTreeRange<>(tree, comparator, lowerBound, true, toElement, inclusive));
-        }
-
-        @Override
-        public NavigableSet<V> tailSet(V fromElement, boolean inclusive)
-        {
-            return new BTreeRange<>(this, new BTreeRange<>(tree, comparator, fromElement, inclusive, null, true));
-        }
-
-        @Override
-        public NavigableSet<V> descendingSet()
-        {
-            return new BTreeDescRange<>(this);
-        }
-    }
-
-    public static class BTreeDescRange<V> extends BTreeRange<V>
-    {
-        BTreeDescRange(BTreeRange<V> from)
-        {
-            super(from.tree, from.comparator, from.lowerBound, from.inclusiveLowerBound, from.upperBound, from.inclusiveUpperBound);
-        }
-
-        @Override
-        protected Cursor<V, V> slice(boolean forwards, boolean permitInversion)
-        {
-            return super.slice(permitInversion ? !forwards : forwards, false);
-        }
-
-        @Override
-        public NavigableSet<V> subSet(V fromElement, boolean fromInclusive, V toElement, boolean toInclusive)
-        {
-            return super.subSet(toElement, toInclusive, fromElement, fromInclusive).descendingSet();
-        }
-
-        @Override
-        public NavigableSet<V> headSet(V toElement, boolean inclusive)
-        {
-            return super.tailSet(toElement, inclusive).descendingSet();
-        }
-
-        @Override
-        public NavigableSet<V> tailSet(V fromElement, boolean inclusive)
-        {
-            return super.headSet(fromElement, inclusive).descendingSet();
-        }
-
-        @Override
-        public NavigableSet<V> descendingSet()
-        {
-            return new BTreeRange<>(this);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/btree/Builder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/Builder.java b/src/java/org/apache/cassandra/utils/btree/Builder.java
index ab78016..b835554 100644
--- a/src/java/org/apache/cassandra/utils/btree/Builder.java
+++ b/src/java/org/apache/cassandra/utils/btree/Builder.java
@@ -54,14 +54,14 @@ final class Builder
      * we assume @param source has been sorted, e.g. by BTree.update, so the update of each key resumes where
      * the previous left off.
      */
-    public <V> Object[] update(Object[] btree, Comparator<V> comparator, Iterable<V> source, UpdateFunction<V> updateF)
+    public <C, K extends C, V extends C> Object[] update(Object[] btree, Comparator<C> comparator, Iterable<K> source, UpdateFunction<K, V> updateF)
     {
         assert updateF != null;
 
         NodeBuilder current = rootBuilder;
         current.reset(btree, POSITIVE_INFINITY, updateF, comparator);
 
-        for (V key : source)
+        for (K key : source)
         {
             while (true)
             {
@@ -96,7 +96,7 @@ final class Builder
         return r;
     }
 
-    public <V> Object[] build(Iterable<V> source, UpdateFunction<V> updateF, int size)
+    public <C, K extends C, V extends C> Object[] build(Iterable<K> source, UpdateFunction<K, V> updateF, int size)
     {
         assert updateF != null;
 
@@ -107,7 +107,7 @@ final class Builder
             current = current.ensureChild();
 
         current.reset(EMPTY_LEAF, POSITIVE_INFINITY, updateF, null);
-        for (V key : source)
+        for (K key : source)
             current.addNewKey(updateF.apply(key));
 
         current = current.ascendToRoot();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/btree/Path.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/Path.java b/src/java/org/apache/cassandra/utils/btree/Path.java
index b1b0e03..9b6789c 100644
--- a/src/java/org/apache/cassandra/utils/btree/Path.java
+++ b/src/java/org/apache/cassandra/utils/btree/Path.java
@@ -104,8 +104,7 @@ public class Path<V>
         // search
 
         Object[] node = path[depth];
-        int lb = indexes[depth];
-        assert lb == 0 || forwards;
+        int lb = forwards ? indexes[depth] : 0;
         pop();
 
         if (target instanceof BTree.Special)
@@ -223,6 +222,20 @@ public class Path<V>
         return -1;
     }
 
+    byte findPredecessorParentDepth()
+    {
+        byte depth = this.depth;
+        depth--;
+        while (depth >= 0)
+        {
+            int ub = indexes[depth] - 1;
+            if (ub >= 0)
+                return depth;
+            depth--;
+        }
+        return -1;
+    }
+
     // move to the next key in the tree
     void successor()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java b/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java
index 9f45031..93c02ae 100644
--- a/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java
+++ b/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java
@@ -22,17 +22,15 @@ import com.google.common.base.Function;
 /**
  * An interface defining a function to be applied to both the object we are replacing in a BTree and
  * the object that is intended to replace it, returning the object to actually replace it.
- *
- * @param <V>
  */
-public interface UpdateFunction<V> extends Function<V, V>
+public interface UpdateFunction<K, V> extends Function<K, V>
 {
     /**
      * @param replacing the value in the original tree we have matched
      * @param update the value in the updating collection that matched
      * @return the value to insert into the new tree
      */
-    V apply(V replacing, V update);
+    V apply(V replacing, K update);
 
     /**
      * @return true if we should fail the update
@@ -44,37 +42,4 @@ public interface UpdateFunction<V> extends Function<V, V>
      */
     void allocated(long heapSize);
 
-    public static final class NoOp<V> implements UpdateFunction<V>
-    {
-
-        private static final NoOp INSTANCE = new NoOp();
-        public static <V> NoOp<V> instance()
-        {
-            return INSTANCE;
-        }
-        
-        private NoOp()
-        {
-        }
-
-        public V apply(V replacing, V update)
-        {
-            return update;
-        }
-
-        public V apply(V update)
-        {
-            return update;
-        }
-
-        public boolean abortEarly()
-        {
-            return false;
-        }
-
-        public void allocated(long heapSize)
-        {
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
index baecb34..e80faca 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
@@ -100,6 +100,11 @@ public class Accumulator<E> implements Iterable<E>
         return presentCount;
     }
 
+    public int capacity()
+    {
+        return values.length;
+    }
+
     public Iterator<E> iterator()
     {
         return new Iterator<E>()


Mime
View raw message