cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [17/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099
Date Tue, 30 Jun 2015 10:47:41 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index b0a8f6b..e7560c2 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -17,42 +17,45 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.io.Closeable;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.io.Closeable;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.base.Throwables;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.rows.RowStats;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.CounterId;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
-public abstract class AbstractSSTableSimpleWriter implements Closeable
+/**
+ * Base class for the sstable writers used by CQLSSTableWriter.
+ */
+abstract class AbstractSSTableSimpleWriter implements Closeable
 {
     protected final File directory;
     protected final CFMetaData metadata;
-    protected DecoratedKey currentKey;
-    protected ColumnFamily columnFamily;
-    protected ByteBuffer currentSuperColumn;
-    protected final CounterId counterid = CounterId.generate();
-    private SSTableFormat.Type formatType = DatabaseDescriptor.getSSTableFormat();
+    protected final PartitionColumns columns;
+    protected SSTableFormat.Type formatType = DatabaseDescriptor.getSSTableFormat();
     protected static AtomicInteger generation = new AtomicInteger(0);
 
-
-    public AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner)
+    protected AbstractSSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns)
     {
         this.metadata = metadata;
         this.directory = directory;
+        this.columns = columns;
         DatabaseDescriptor.setPartitioner(partitioner);
     }
 
@@ -61,12 +64,15 @@ public abstract class AbstractSSTableSimpleWriter implements Closeable
         this.formatType = type;
     }
 
-    protected SSTableWriter getWriter()
+    protected SSTableWriter createWriter()
     {
-        return SSTableWriter.create(createDescriptor(directory, metadata.ksName, metadata.cfName, formatType), 0, ActiveRepairService.UNREPAIRED_SSTABLE);
+        return SSTableWriter.create(createDescriptor(directory, metadata.ksName, metadata.cfName, formatType),
+                                    0,
+                                    ActiveRepairService.UNREPAIRED_SSTABLE,
+                                    new SerializationHeader(metadata, columns, RowStats.NO_STATS));
     }
 
-    protected static Descriptor createDescriptor(File directory, final String keyspace, final String columnFamily, final SSTableFormat.Type fmt)
+    private static Descriptor createDescriptor(File directory, final String keyspace, final String columnFamily, final SSTableFormat.Type fmt)
     {
         int maxGen = getNextGeneration(directory, columnFamily);
         return new Descriptor(directory, keyspace, columnFamily, maxGen + 1, Descriptor.Type.TEMP, fmt);
@@ -102,108 +108,11 @@ public abstract class AbstractSSTableSimpleWriter implements Closeable
     }
 
     /**
-     * Start a new row whose key is {@code key}.
-     * @param key the row key
-     */
-    public void newRow(ByteBuffer key) throws IOException
-    {
-        if (currentKey != null && !columnFamily.isEmpty())
-            writeRow(currentKey, columnFamily);
-
-        currentKey = DatabaseDescriptor.getPartitioner().decorateKey(key);
-        columnFamily = getColumnFamily();
-    }
-
-    /**
-     * Start a new super column with name {@code name}.
-     * @param name the name for the super column
+     * Returns a PartitionUpdate suitable to write on this writer for the provided key.
+     *
+     * @param key they partition key for which the returned update will be.
+     * @return an update on partition {@code key} that is tied to this writer.
      */
-    public void newSuperColumn(ByteBuffer name)
-    {
-        if (!columnFamily.metadata().isSuper())
-            throw new IllegalStateException("Cannot add a super column to a standard table");
-
-        currentSuperColumn = name;
-    }
-
-    protected void addColumn(Cell cell) throws IOException
-    {
-        if (columnFamily.metadata().isSuper())
-        {
-            if (currentSuperColumn == null)
-                throw new IllegalStateException("Trying to add a cell to a super column family, but no super cell has been started.");
-
-            cell = cell.withUpdatedName(columnFamily.getComparator().makeCellName(currentSuperColumn, cell.name().toByteBuffer()));
-        }
-        columnFamily.addColumn(cell);
-    }
-
-    /**
-     * Insert a new "regular" column to the current row (and super column if applicable).
-     * @param name the column name
-     * @param value the column value
-     * @param timestamp the column timestamp
-     */
-    public void addColumn(ByteBuffer name, ByteBuffer value, long timestamp) throws IOException
-    {
-        addColumn(new BufferCell(metadata.comparator.cellFromByteBuffer(name), value, timestamp));
-    }
-
-    /**
-     * Insert a new expiring column to the current row (and super column if applicable).
-     * @param name the column name
-     * @param value the column value
-     * @param timestamp the column timestamp
-     * @param ttl the column time to live in seconds
-     * @param expirationTimestampMS the local expiration timestamp in milliseconds. This is the server time timestamp used for actually
-     * expiring the column, and as a consequence should be synchronized with the cassandra servers time. If {@code timestamp} represents
-     * the insertion time in microseconds (which is not required), this should be {@code (timestamp / 1000) + (ttl * 1000)}.
-     */
-    public void addExpiringColumn(ByteBuffer name, ByteBuffer value, long timestamp, int ttl, long expirationTimestampMS) throws IOException
-    {
-        addColumn(new BufferExpiringCell(metadata.comparator.cellFromByteBuffer(name), value, timestamp, ttl, (int)(expirationTimestampMS / 1000)));
-    }
-
-    /**
-     * Insert a new counter column to the current row (and super column if applicable).
-     * @param name the column name
-     * @param value the value of the counter
-     */
-    public void addCounterColumn(ByteBuffer name, long value) throws IOException
-    {
-        addColumn(new BufferCounterCell(metadata.comparator.cellFromByteBuffer(name),
-                                        CounterContext.instance().createGlobal(counterid, 1L, value),
-                                        System.currentTimeMillis()));
-    }
-
-    /**
-     * Package protected for use by AbstractCQLSSTableWriter.
-     * Not meant to be exposed publicly.
-     */
-    ColumnFamily currentColumnFamily()
-    {
-        return columnFamily;
-    }
-
-    /**
-     * Package protected for use by AbstractCQLSSTableWriter.
-     * Not meant to be exposed publicly.
-     */
-    DecoratedKey currentKey()
-    {
-        return currentKey;
-    }
-
-    /**
-     * Package protected for use by AbstractCQLSSTableWriter.
-     * Not meant to be exposed publicly.
-     */
-    boolean shouldStartNewRow() throws IOException
-    {
-        return currentKey == null;
-    }
-
-    protected abstract void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException;
-
-    protected abstract ColumnFamily getColumnFamily() throws IOException;
+    abstract PartitionUpdate getUpdateFor(DecoratedKey key) throws IOException;
 }
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index 4181ed0..d3f3380 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -25,18 +25,13 @@ import java.util.*;
 
 import com.google.common.collect.ImmutableMap;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
 import org.apache.cassandra.cql3.statements.ParsedStatement;
 import org.apache.cassandra.cql3.statements.UpdateStatement;
-import org.apache.cassandra.db.ArrayBackedSortedColumns;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -208,26 +203,28 @@ public class CQLSSTableWriter implements Closeable
 
         QueryOptions options = QueryOptions.forInternalCalls(null, values);
         List<ByteBuffer> keys = insert.buildPartitionKeyNames(options);
-        Composite clusteringPrefix = insert.createClusteringPrefix(options);
+        CBuilder clustering = insert.createClustering(options);
 
         long now = System.currentTimeMillis() * 1000;
+        // Note that we asks indexes to not validate values (the last 'false' arg below) because that triggers a 'Keyspace.open'
+        // and that forces a lot of initialization that we don't want.
         UpdateParameters params = new UpdateParameters(insert.cfm,
                                                        options,
                                                        insert.getTimestamp(now, options),
                                                        insert.getTimeToLive(options),
-                                                       Collections.<ByteBuffer, CQL3Row>emptyMap());
+                                                       Collections.<DecoratedKey, Partition>emptyMap(),
+                                                       false);
 
         try
         {
             for (ByteBuffer key : keys)
             {
-                if (writer.shouldStartNewRow() || !key.equals(writer.currentKey().getKey()))
-                    writer.newRow(key);
-                insert.addUpdateForKey(writer.currentColumnFamily(), key, clusteringPrefix, params, false);
+                DecoratedKey dk = DatabaseDescriptor.getPartitioner().decorateKey(key);
+                insert.addUpdateForKey(writer.getUpdateFor(dk), clustering, params);
             }
             return this;
         }
-        catch (BufferedWriter.SyncException e)
+        catch (SSTableSimpleUnsortedWriter.SyncException e)
         {
             // If we use a BufferedWriter and had a problem writing to disk, the IOException has been
             // wrapped in a SyncException (see BufferedWriter below). We want to extract that IOE.
@@ -348,7 +345,7 @@ public class CQLSSTableWriter implements Closeable
             {
                 synchronized (CQLSSTableWriter.class)
                 {
-                    this.schema = getStatement(schema, CreateTableStatement.class, "CREATE TABLE").left.getCFMetaData().rebuild();
+                    this.schema = getStatement(schema, CreateTableStatement.class, "CREATE TABLE").left.getCFMetaData();
 
                     // We need to register the keyspace/table metadata through Schema, otherwise we won't be able to properly
                     // build the insert statement in using().
@@ -373,7 +370,7 @@ public class CQLSSTableWriter implements Closeable
         /**
          * Creates the keyspace with the specified table.
          *
-         * @param the table the table that must be created.
+         * @param table the table that must be created.
          */
         private static void createKeyspaceWithTable(CFMetaData table)
         {
@@ -520,8 +517,8 @@ public class CQLSSTableWriter implements Closeable
                 throw new IllegalStateException("No insert statement specified, you should provide an insert statement through using()");
 
             AbstractSSTableSimpleWriter writer = sorted
-                                               ? new SSTableSimpleWriter(directory, schema, partitioner)
-                                               : new BufferedWriter(directory, schema, partitioner, bufferSizeInMB);
+                                               ? new SSTableSimpleWriter(directory, schema, partitioner, insert.updatedColumns())
+                                               : new SSTableSimpleUnsortedWriter(directory, schema, partitioner, insert.updatedColumns(), bufferSizeInMB);
 
             if (formatType != null)
                 writer.setSSTableFormatType(formatType);
@@ -529,81 +526,4 @@ public class CQLSSTableWriter implements Closeable
             return new CQLSSTableWriter(writer, insert, boundNames);
         }
     }
-
-    /**
-     * CQLSSTableWriter doesn't use the method addColumn() from AbstractSSTableSimpleWriter.
-     * Instead, it adds cells directly to the ColumnFamily the latter exposes. But this means
-     * that the sync() method of SSTableSimpleUnsortedWriter is not called (at least not for
-     * each CQL row, so adding many rows to the same partition can buffer too much data in
-     * memory - #7360). So we create a slightly modified SSTableSimpleUnsortedWriter that uses
-     * a tweaked ColumnFamily object that calls back the proper method after each added cell
-     * so we sync when we should.
-     */
-    private static class BufferedWriter extends SSTableSimpleUnsortedWriter
-    {
-        private boolean needsSync = false;
-
-        public BufferedWriter(File directory, CFMetaData metadata, IPartitioner partitioner, long bufferSizeInMB)
-        {
-            super(directory, metadata, partitioner, bufferSizeInMB);
-        }
-
-        @Override
-        protected ColumnFamily createColumnFamily()
-        {
-            return new ArrayBackedSortedColumns(metadata, false)
-            {
-                @Override
-                public void addColumn(Cell cell)
-                {
-                    super.addColumn(cell);
-                    try
-                    {
-                        countColumn(cell);
-                    }
-                    catch (IOException e)
-                    {
-                        // addColumn does not throw IOException but we want to report this to the user,
-                        // so wrap it in a temporary RuntimeException that we'll catch in rawAddRow above.
-                        throw new SyncException(e);
-                    }
-                }
-            };
-        }
-
-        @Override
-        protected void replaceColumnFamily() throws IOException
-        {
-            needsSync = true;
-        }
-
-        /**
-         * If we have marked that the column family is being replaced, when we start the next row,
-         * we should sync out the previous partition and create a new row based on the current value.
-         */
-        @Override
-        boolean shouldStartNewRow() throws IOException
-        {
-            if (needsSync)
-            {
-                needsSync = false;
-                super.sync();
-                return true;
-            }
-            return super.shouldStartNewRow();
-        }
-
-        protected void addColumn(Cell cell) throws IOException
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        static class SyncException extends RuntimeException
-        {
-            SyncException(IOException ioe)
-            {
-                super(ioe);
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java b/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java
deleted file mode 100644
index 846634a..0000000
--- a/src/java/org/apache/cassandra/io/sstable/ColumnNameHelper.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.marshal.AbstractType;
-
-import static org.apache.cassandra.utils.ByteBufferUtil.minimalBufferFor;
-
-public class ColumnNameHelper
-{
-    private static List<ByteBuffer> maybeGrow(List<ByteBuffer> l, int size)
-    {
-        if (l.size() >= size)
-            return l;
-
-        List<ByteBuffer> nl = new ArrayList<>(size);
-        nl.addAll(l);
-        for (int i = l.size(); i < size; i++)
-            nl.add(null);
-        return nl;
-    }
-
-    private static List<ByteBuffer> getComponents(Composite prefix, int size)
-    {
-        List<ByteBuffer> l = new ArrayList<>(size);
-        for (int i = 0; i < size; i++)
-            l.add(prefix.get(i));
-        return l;
-    }
-
-    /**
-     * finds the max cell name component(s)
-     *
-     * Note that this method *can modify maxSeen*.
-     *
-     * @param maxSeen the max columns seen so far
-     * @param candidate the candidate column(s)
-     * @param comparator the comparator to use
-     * @return a list with the max column(s)
-     */
-    public static List<ByteBuffer> maxComponents(List<ByteBuffer> maxSeen, Composite candidate, CellNameType comparator)
-    {
-        // For a cell name, no reason to look more than the clustering prefix
-        // (and comparing the collection element would actually crash)
-        int size = Math.min(candidate.size(), comparator.clusteringPrefixSize());
-
-        if (maxSeen.isEmpty())
-            return getComponents(candidate, size);
-
-        // In most case maxSeen is big enough to hold the result so update it in place in those cases
-        maxSeen = maybeGrow(maxSeen, size);
-
-        for (int i = 0; i < size; i++)
-            maxSeen.set(i, max(maxSeen.get(i), candidate.get(i), comparator.subtype(i)));
-
-        return maxSeen;
-    }
-
-    /**
-     * finds the min cell name component(s)
-     *
-     * Note that this method *can modify maxSeen*.
-     *
-     * @param minSeen the max columns seen so far
-     * @param candidate the candidate column(s)
-     * @param comparator the comparator to use
-     * @return a list with the min column(s)
-     */
-    public static List<ByteBuffer> minComponents(List<ByteBuffer> minSeen, Composite candidate, CellNameType comparator)
-    {
-        // For a cell name, no reason to look more than the clustering prefix
-        // (and comparing the collection element would actually crash)
-        int size = Math.min(candidate.size(), comparator.clusteringPrefixSize());
-
-        if (minSeen.isEmpty())
-            return getComponents(candidate, size);
-
-        // In most case maxSeen is big enough to hold the result so update it in place in those cases
-        minSeen = maybeGrow(minSeen, size);
-
-        for (int i = 0; i < size; i++)
-            minSeen.set(i, min(minSeen.get(i), candidate.get(i), comparator.subtype(i)));
-
-        return minSeen;
-    }
-
-    /**
-     * return the min column
-     *
-     * note that comparator should not be of CompositeType!
-     *
-     * @param b1 lhs
-     * @param b2 rhs
-     * @param comparator the comparator to use
-     * @return the smallest column according to comparator
-     */
-    private static ByteBuffer min(ByteBuffer b1, ByteBuffer b2, AbstractType<?> comparator)
-    {
-        if (b1 == null)
-            return b2;
-        if (b2 == null)
-            return b1;
-
-        if (comparator.compare(b1, b2) >= 0)
-            return b2;
-        return b1;
-    }
-
-    /**
-     * return the max column
-     *
-     * note that comparator should not be of CompositeType!
-     *
-     * @param b1 lhs
-     * @param b2 rhs
-     * @param comparator the comparator to use
-     * @return the biggest column according to comparator
-     */
-    private static ByteBuffer max(ByteBuffer b1, ByteBuffer b2, AbstractType<?> comparator)
-    {
-        if (b1 == null)
-            return b2;
-        if (b2 == null)
-            return b1;
-
-        if (comparator.compare(b1, b2) >= 0)
-            return b1;
-        return b2;
-    }
-
-    /**
-     * Merge 2 lists of min cell name components.
-     *
-     * @param minColumnNames lhs
-     * @param candidates rhs
-     * @param comparator comparator to use
-     * @return a list with smallest column names according to (sub)comparator
-     */
-    public static List<ByteBuffer> mergeMin(List<ByteBuffer> minColumnNames, List<ByteBuffer> candidates, CellNameType comparator)
-    {
-        if (minColumnNames.isEmpty())
-            return minimalBuffersFor(candidates);
-
-        if (candidates.isEmpty())
-            return minColumnNames;
-
-        List<ByteBuffer> biggest = minColumnNames.size() > candidates.size() ? minColumnNames : candidates;
-        List<ByteBuffer> smallest = minColumnNames.size() > candidates.size() ? candidates : minColumnNames;
-
-        // We want to always copy the smallest list, and maybeGrow does it only if it's actually smaller
-        List<ByteBuffer> retList = smallest.size() == biggest.size()
-                                 ? new ArrayList<>(smallest)
-                                 : maybeGrow(smallest, biggest.size());
-
-        for (int i = 0; i < biggest.size(); i++)
-            retList.set(i, minimalBufferFor(min(retList.get(i), biggest.get(i), comparator.subtype(i))));
-
-        return retList;
-    }
-
-    private static List<ByteBuffer> minimalBuffersFor(List<ByteBuffer> candidates)
-    {
-        List<ByteBuffer> minimalBuffers = new ArrayList<ByteBuffer>(candidates.size());
-        for (ByteBuffer byteBuffer : candidates)
-            minimalBuffers.add(minimalBufferFor(byteBuffer));
-        return minimalBuffers;
-    }
-
-    /**
-     * Merge 2 lists of max cell name components.
-     *
-     * @param maxColumnNames lhs
-     * @param candidates rhs
-     * @param comparator comparator to use
-     * @return a list with biggest column names according to (sub)comparator
-     */
-    public static List<ByteBuffer> mergeMax(List<ByteBuffer> maxColumnNames, List<ByteBuffer> candidates, CellNameType comparator)
-    {
-        if (maxColumnNames.isEmpty())
-            return minimalBuffersFor(candidates);
-
-        if (candidates.isEmpty())
-            return maxColumnNames;
-
-        List<ByteBuffer> biggest = maxColumnNames.size() > candidates.size() ? maxColumnNames : candidates;
-        List<ByteBuffer> smallest = maxColumnNames.size() > candidates.size() ? candidates : maxColumnNames;
-
-        // We want to always copy the smallest list, and maybeGrow does it only if it's actually smaller
-        List<ByteBuffer> retList = smallest.size() == biggest.size()
-                                 ? new ArrayList<>(smallest)
-                                 : maybeGrow(smallest, biggest.size());
-
-        for (int i = 0; i < biggest.size(); i++)
-            retList.set(i, minimalBufferFor(max(retList.get(i), biggest.get(i), comparator.subtype(i))));
-
-        return retList;
-    }
-
-    /**
-     * Checks if the given min/max column names could overlap (i.e they could share some column names based on the max/min column names in the sstables)
-     */
-    public static boolean overlaps(List<ByteBuffer> minColumnNames1, List<ByteBuffer> maxColumnNames1, List<ByteBuffer> minColumnNames2, List<ByteBuffer> maxColumnNames2, CellNameType comparator)
-    {
-        if (minColumnNames1.isEmpty() || maxColumnNames1.isEmpty() || minColumnNames2.isEmpty() || maxColumnNames2.isEmpty())
-            return true;
-
-        return !(compare(maxColumnNames1, minColumnNames2, comparator) < 0 || compare(minColumnNames1, maxColumnNames2, comparator) > 0);
-    }
-
-    private static int compare(List<ByteBuffer> columnNames1, List<ByteBuffer> columnNames2, CellNameType comparator)
-    {
-        for (int i = 0; i < Math.min(columnNames1.size(), columnNames2.size()); i++)
-        {
-            int cmp = comparator.subtype(i).compare(columnNames1.get(i), columnNames2.get(i));
-            if (cmp != 0)
-                return cmp;
-        }
-        return 0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ColumnStats.java b/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
deleted file mode 100644
index a1cb199..0000000
--- a/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.io.sstable;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.cassandra.utils.StreamingHistogram;
-
-/**
- * ColumnStats holds information about the columns for one row inside sstable
- */
-public class ColumnStats
-{
-    /** how many columns are there in the row */
-    public final int columnCount;
-
-    /** the largest (client-supplied) timestamp in the row */
-    public final long minTimestamp;
-    public final long maxTimestamp;
-    public final int maxLocalDeletionTime;
-    /** histogram of tombstone drop time */
-    public final StreamingHistogram tombstoneHistogram;
-
-    /** max and min column names according to comparator */
-    public final List<ByteBuffer> minColumnNames;
-    public final List<ByteBuffer> maxColumnNames;
-
-    public final boolean hasLegacyCounterShards;
-
-    public ColumnStats(int columnCount,
-                       long minTimestamp,
-                       long maxTimestamp,
-                       int maxLocalDeletionTime,
-                       StreamingHistogram tombstoneHistogram,
-                       List<ByteBuffer> minColumnNames,
-                       List<ByteBuffer> maxColumnNames,
-                       boolean hasLegacyCounterShards)
-    {
-        this.minTimestamp = minTimestamp;
-        this.maxTimestamp = maxTimestamp;
-        this.maxLocalDeletionTime = maxLocalDeletionTime;
-        this.columnCount = columnCount;
-        this.tombstoneHistogram = tombstoneHistogram;
-        this.minColumnNames = minColumnNames;
-        this.maxColumnNames = maxColumnNames;
-        this.hasLegacyCounterShards = hasLegacyCounterShards;
-    }
-
-    // We use explicit classes for ints and longs instead of generics to avoid boxing and unboxing (See CASSANDRA-8109)
-    public static class MinLongTracker
-    {
-        private final long defaultValue;
-        private boolean isSet = false;
-        private long value;
-
-        public MinLongTracker(long defaultValue)
-        {
-            this.defaultValue = defaultValue;
-        }
-
-        public void update(long value)
-        {
-            if (!isSet)
-            {
-                this.value = value;
-                isSet = true;
-            }
-            else
-            {
-                if (value < this.value)
-                    this.value = value;
-            }
-        }
-
-        public long get()
-        {
-            if (isSet)
-                return value;
-            return defaultValue;
-        }
-    }
-
-    public static class MaxLongTracker
-    {
-        private final long defaultValue;
-        private boolean isSet = false;
-        private long value;
-
-        public MaxLongTracker(long defaultValue)
-        {
-            this.defaultValue = defaultValue;
-        }
-
-        public void update(long value)
-        {
-            if (!isSet)
-            {
-                this.value = value;
-                isSet = true;
-            }
-            else
-            {
-                if (value >this.value)
-                    this.value = value;
-            }
-        }
-
-        public long get()
-        {
-            if (isSet)
-                return value;
-            return defaultValue;
-        }
-    }
-
-    public static class MaxIntTracker
-    {
-        private final int defaultValue;
-        private boolean isSet = false;
-        private int value;
-
-        public MaxIntTracker(int defaultValue)
-        {
-            this.defaultValue = defaultValue;
-        }
-
-        public void update(int value)
-        {
-            if (!isSet)
-            {
-                this.value = value;
-                isSet = true;
-            }
-            else
-            {
-                if (value > this.value)
-                    this.value = value;
-            }
-        }
-
-        public int get()
-        {
-            if (isSet)
-                return value;
-            return defaultValue;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/CorruptSSTableException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CorruptSSTableException.java b/src/java/org/apache/cassandra/io/sstable/CorruptSSTableException.java
index a71daaf..0fe316d 100644
--- a/src/java/org/apache/cassandra/io/sstable/CorruptSSTableException.java
+++ b/src/java/org/apache/cassandra/io/sstable/CorruptSSTableException.java
@@ -25,7 +25,7 @@ public class CorruptSSTableException extends RuntimeException
 
     public CorruptSSTableException(Exception cause, File path)
     {
-        super(cause);
+        super("Corrupted: " + path, cause);
         this.path = path;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index 9f259fe..5ab99e7 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.sstable.metadata.IMetadataSerializer;
 import org.apache.cassandra.io.sstable.metadata.LegacyMetadataSerializer;
 import org.apache.cassandra.io.sstable.metadata.MetadataSerializer;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.Pair;
 
 import static org.apache.cassandra.io.sstable.Component.separator;
@@ -45,7 +46,6 @@ import static org.apache.cassandra.io.sstable.Component.separator;
  */
 public class Descriptor
 {
-
     public static enum Type
     {
         TEMP("tmp", true), TEMPLINK("tmplink", true), FINAL(null, false);
@@ -58,7 +58,6 @@ public class Descriptor
         }
     }
 
-
     public final File directory;
     /** version has the following format: <code>[a-z]+</code> */
     public final Version version;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
index b80bd87..7063057 100644
--- a/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/ISSTableScanner.java
@@ -19,14 +19,13 @@
 
 package org.apache.cassandra.io.sstable;
 
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 
 /**
  * An ISSTableScanner is an abstraction allowing multiple SSTableScanners to be
  * chained together under the hood.  See LeveledCompactionStrategy.getScanners.
  */
-public interface ISSTableScanner extends CloseableIterator<OnDiskAtomIterator>
+public interface ISSTableScanner extends UnfilteredPartitionIterator
 {
     public long getLengthInBytes();
     public long getCurrentPosition();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
index 3d304c5..d19c8f7 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
@@ -22,10 +22,10 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
-import org.apache.cassandra.db.composites.CType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileUtils;
@@ -67,35 +67,23 @@ public class IndexHelper
     /**
      * The index of the IndexInfo in which a scan starting with @name should begin.
      *
-     * @param name
-     *         name of the index
-     *
-     * @param indexList
-     *          list of the indexInfo objects
-     *
-     * @param comparator
-     *          comparator type
-     *
-     * @param reversed
-     *          is name reversed
+     * @param name name to search for
+     * @param indexList list of the indexInfo objects
+     * @param comparator the comparator to use
+     * @param reversed whether or not the search is reversed, i.e. we scan forward or backward from name
+     * @param lastIndex where to start the search from in indexList
      *
      * @return int index
      */
-    public static int indexFor(Composite name, List<IndexInfo> indexList, CType comparator, boolean reversed, int lastIndex)
+    public static int indexFor(ClusteringPrefix name, List<IndexInfo> indexList, ClusteringComparator comparator, boolean reversed, int lastIndex)
     {
-        if (name.isEmpty())
-            return lastIndex >= 0 ? lastIndex : reversed ? indexList.size() - 1 : 0;
-
-        if (lastIndex >= indexList.size())
-            return -1;
-
-        IndexInfo target = new IndexInfo(name, name, 0, 0);
+        IndexInfo target = new IndexInfo(name, name, 0, 0, null);
         /*
         Take the example from the unit test, and say your index looks like this:
         [0..5][10..15][20..25]
         and you look for the slice [13..17].
 
-        When doing forward slice, we we doing a binary search comparing 13 (the start of the query)
+        When doing forward slice, we are doing a binary search comparing 13 (the start of the query)
         to the lastName part of the index slot. You'll end up with the "first" slot, going from left to right,
         that may contain the start.
 
@@ -105,81 +93,117 @@ public class IndexHelper
         */
         int startIdx = 0;
         List<IndexInfo> toSearch = indexList;
-        if (lastIndex >= 0)
+        if (reversed)
         {
-            if (reversed)
+            if (lastIndex < indexList.size() - 1)
             {
                 toSearch = indexList.subList(0, lastIndex + 1);
             }
-            else
+        }
+        else
+        {
+            if (lastIndex > 0)
             {
                 startIdx = lastIndex;
                 toSearch = indexList.subList(lastIndex, indexList.size());
             }
         }
-        int index = Collections.binarySearch(toSearch, target, getComparator(comparator, reversed));
+        int index = Collections.binarySearch(toSearch, target, comparator.indexComparator(reversed));
         return startIdx + (index < 0 ? -index - (reversed ? 2 : 1) : index);
     }
 
-    public static Comparator<IndexInfo> getComparator(final CType nameComparator, boolean reversed)
-    {
-        return reversed ? nameComparator.indexReverseComparator() : nameComparator.indexComparator();
-    }
-
     public static class IndexInfo
     {
-        private static final long EMPTY_SIZE = ObjectSizes.measure(new IndexInfo(null, null, 0, 0));
+        private static final long EMPTY_SIZE = ObjectSizes.measure(new IndexInfo(null, null, 0, 0, null));
 
         public final long width;
-        public final Composite lastName;
-        public final Composite firstName;
+        public final ClusteringPrefix lastName;
+        public final ClusteringPrefix firstName;
         public final long offset;
 
-        public IndexInfo(Composite firstName, Composite lastName, long offset, long width)
+        // If at the end of the index block there is an open range tombstone marker, this marker
+        // deletion infos. null otherwise.
+        public final DeletionTime endOpenMarker;
+
+        public IndexInfo(ClusteringPrefix firstName,
+                         ClusteringPrefix lastName,
+                         long offset,
+                         long width,
+                         DeletionTime endOpenMarker)
         {
             this.firstName = firstName;
             this.lastName = lastName;
             this.offset = offset;
             this.width = width;
+            this.endOpenMarker = endOpenMarker;
         }
 
-        public static class Serializer implements ISerializer<IndexInfo>
+        public static class Serializer
         {
-            private final CType type;
+            private final CFMetaData metadata;
+            private final Version version;
 
-            public Serializer(CType type)
+            public Serializer(CFMetaData metadata, Version version)
             {
-                this.type = type;
+                this.metadata = metadata;
+                this.version = version;
             }
 
-            public void serialize(IndexInfo info, DataOutputPlus out) throws IOException
+            public void serialize(IndexInfo info, DataOutputPlus out, SerializationHeader header) throws IOException
             {
-                type.serializer().serialize(info.firstName, out);
-                type.serializer().serialize(info.lastName, out);
+                ISerializer<ClusteringPrefix> clusteringSerializer = metadata.serializers().clusteringPrefixSerializer(version, header);
+                clusteringSerializer.serialize(info.firstName, out);
+                clusteringSerializer.serialize(info.lastName, out);
                 out.writeLong(info.offset);
                 out.writeLong(info.width);
+
+                if (version.storeRows())
+                {
+                    out.writeBoolean(info.endOpenMarker != null);
+                    if (info.endOpenMarker != null)
+                        DeletionTime.serializer.serialize(info.endOpenMarker, out);
+                }
             }
 
-            public IndexInfo deserialize(DataInput in) throws IOException
+            public IndexInfo deserialize(DataInput in, SerializationHeader header) throws IOException
             {
-                return new IndexInfo(type.serializer().deserialize(in),
-                                     type.serializer().deserialize(in),
-                                     in.readLong(),
-                                     in.readLong());
+                ISerializer<ClusteringPrefix> clusteringSerializer = metadata.serializers().clusteringPrefixSerializer(version, header);
+
+                ClusteringPrefix firstName = clusteringSerializer.deserialize(in);
+                ClusteringPrefix lastName = clusteringSerializer.deserialize(in);
+                long offset = in.readLong();
+                long width = in.readLong();
+                DeletionTime endOpenMarker = version.storeRows() && in.readBoolean()
+                                           ? DeletionTime.serializer.deserialize(in)
+                                           : null;
+
+                return new IndexInfo(firstName, lastName, offset, width, endOpenMarker);
             }
 
-            public long serializedSize(IndexInfo info, TypeSizes typeSizes)
+            public long serializedSize(IndexInfo info, SerializationHeader header, TypeSizes typeSizes)
             {
-                return type.serializer().serializedSize(info.firstName, typeSizes)
-                     + type.serializer().serializedSize(info.lastName, typeSizes)
-                     + typeSizes.sizeof(info.offset)
-                     + typeSizes.sizeof(info.width);
+                ISerializer<ClusteringPrefix> clusteringSerializer = metadata.serializers().clusteringPrefixSerializer(version, header);
+                long size = clusteringSerializer.serializedSize(info.firstName, typeSizes)
+                          + clusteringSerializer.serializedSize(info.lastName, typeSizes)
+                          + typeSizes.sizeof(info.offset)
+                          + typeSizes.sizeof(info.width);
+
+                if (version.storeRows())
+                {
+                    size += typeSizes.sizeof(info.endOpenMarker != null);
+                    if (info.endOpenMarker != null)
+                        size += DeletionTime.serializer.serializedSize(info.endOpenMarker, typeSizes);
+                }
+                return size;
             }
         }
 
         public long unsharedHeapSize()
         {
-            return EMPTY_SIZE + firstName.unsharedHeapSize() + lastName.unsharedHeapSize();
+            return EMPTY_SIZE
+                 + firstName.unsharedHeapSize()
+                 + lastName.unsharedHeapSize()
+                 + (endOpenMarker == null ? 0 : endOpenMarker.unsharedHeapSize());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index 7df7349..90c5b0e 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -18,8 +18,6 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -28,10 +26,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.WrappedSharedCloseable;
 import org.apache.cassandra.utils.memory.MemoryUtil;
@@ -110,7 +107,7 @@ public class IndexSummary extends WrappedSharedCloseable
 
     // binary search is notoriously more difficult to get right than it looks; this is lifted from
     // Harmony's Collections implementation
-    public int binarySearch(RowPosition key)
+    public int binarySearch(PartitionPosition key)
     {
         // We will be comparing non-native Keys, so use a buffer with appropriate byte order
         ByteBuffer hollow = MemoryUtil.getHollowDirectByteBuffer().order(ByteOrder.BIG_ENDIAN);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
index e5f61b2..f1e01f2 100644
--- a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
@@ -57,7 +57,7 @@ public class ReducingKeyIterator implements CloseableIterator<DecoratedKey>
                     return true;
                 }
 
-                public void reduce(DecoratedKey current)
+                public void reduce(int idx, DecoratedKey current)
                 {
                     reduced = current;
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 17f9a8d..70ab99c 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -18,31 +18,24 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.*;
-import java.util.Iterator;
+
+import com.google.common.collect.AbstractIterator;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.serializers.MarshalException;
 
-    public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterator>, OnDiskAtomIterator
+public class SSTableIdentityIterator extends AbstractIterator<Unfiltered> implements Comparable<SSTableIdentityIterator>, UnfilteredRowIterator
 {
+    private final SSTableReader sstable;
     private final DecoratedKey key;
-    private final DataInput in;
-    public final ColumnSerializer.Flag flag;
-
-    private final ColumnFamily columnFamily;
-    private final Iterator<OnDiskAtom> atomIterator;
-    private final boolean validateColumns;
+    private final DeletionTime partitionLevelDeletion;
     private final String filename;
 
-    // Not every SSTableIdentifyIterator is attached to a sstable, so this can be null.
-    private final SSTableReader sstable;
+    private final SSTableSimpleIterator iterator;
+    private final Row staticRow;
 
     /**
      * Used to iterate through the columns of a row.
@@ -52,80 +45,66 @@ import org.apache.cassandra.serializers.MarshalException;
      */
     public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key)
     {
-        this(sstable, file, key, false);
-    }
-
-    /**
-     * Used to iterate through the columns of a row.
-     * @param sstable SSTable we are reading ffrom.
-     * @param file Reading using this file.
-     * @param key Key of this row.
-     * @param checkData if true, do its best to deserialize and check the coherence of row data
-     */
-    public SSTableIdentityIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key, boolean checkData)
-    {
-        this(sstable.metadata, file, file.getPath(), key, checkData, sstable, ColumnSerializer.Flag.LOCAL);
-    }
-
-    // sstable may be null *if* checkData is false
-    // If it is null, we assume the data is in the current file format
-    private SSTableIdentityIterator(CFMetaData metadata,
-                                    FileDataInput in,
-                                    String filename,
-                                    DecoratedKey key,
-                                    boolean checkData,
-                                    SSTableReader sstable,
-                                    ColumnSerializer.Flag flag)
-    {
-        assert !checkData || (sstable != null);
-        this.in = in;
-        this.filename = filename;
-        this.key = key;
-        this.flag = flag;
-        this.validateColumns = checkData;
         this.sstable = sstable;
-
-        Version dataVersion = sstable == null ? DatabaseDescriptor.getSSTableFormat().info.getLatestVersion() : sstable.descriptor.version;
-        int expireBefore = (int) (System.currentTimeMillis() / 1000);
-        columnFamily = ArrayBackedSortedColumns.factory.create(metadata);
+        this.filename = file.getPath();
+        this.key = key;
 
         try
         {
-            columnFamily.delete(DeletionTime.serializer.deserialize(in));
-            atomIterator = columnFamily.metadata().getOnDiskIterator(in, flag, expireBefore, dataVersion);
+            this.partitionLevelDeletion = DeletionTime.serializer.deserialize(file);
+            SerializationHelper helper = new SerializationHelper(sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL);
+            this.iterator = SSTableSimpleIterator.create(sstable.metadata, file, sstable.header, helper, partitionLevelDeletion);
+            this.staticRow = iterator.readStaticRow();
         }
         catch (IOException e)
         {
-            if (sstable != null)
-                sstable.markSuspect();
+            sstable.markSuspect();
             throw new CorruptSSTableException(e, filename);
         }
     }
 
-    public DecoratedKey getKey()
+    public CFMetaData metadata()
+    {
+        return sstable.metadata;
+    }
+
+    public PartitionColumns columns()
+    {
+        return metadata().partitionColumns();
+    }
+
+    public boolean isReverseOrder()
+    {
+        return false;
+    }
+
+    public DecoratedKey partitionKey()
     {
         return key;
     }
 
-    public ColumnFamily getColumnFamily()
+    public DeletionTime partitionLevelDeletion()
     {
-        return columnFamily;
+        return partitionLevelDeletion;
     }
 
-    public boolean hasNext()
+    public Row staticRow()
+    {
+        return staticRow;
+    }
+
+    protected Unfiltered computeNext()
     {
         try
         {
-            return atomIterator.hasNext();
+            return iterator.hasNext() ? iterator.next() : endOfData();
         }
         catch (IOError e)
         {
-            // catch here b/c atomIterator is an AbstractIterator; hasNext reads the value
             if (e.getCause() instanceof IOException)
             {
-                if (sstable != null)
-                    sstable.markSuspect();
-                throw new CorruptSSTableException((IOException)e.getCause(), filename);
+                sstable.markSuspect();
+                throw new CorruptSSTableException((Exception)e.getCause(), filename);
             }
             else
             {
@@ -134,26 +113,6 @@ import org.apache.cassandra.serializers.MarshalException;
         }
     }
 
-    public OnDiskAtom next()
-    {
-        try
-        {
-            OnDiskAtom atom = atomIterator.next();
-            if (validateColumns)
-                atom.validateFields(columnFamily.metadata());
-            return atom;
-        }
-        catch (MarshalException me)
-        {
-            throw new CorruptSSTableException(me, filename);
-        }
-    }
-
-    public void remove()
-    {
-        throw new UnsupportedOperationException();
-    }
-
     public void close()
     {
         // creator is responsible for closing file when finished
@@ -161,16 +120,14 @@ import org.apache.cassandra.serializers.MarshalException;
 
     public String getPath()
     {
-        // if input is from file, then return that path, otherwise it's from streaming
-        if (in instanceof RandomAccessReader)
-        {
-            RandomAccessReader file = (RandomAccessReader) in;
-            return file.getPath();
-        }
-        else
-        {
-            throw new UnsupportedOperationException();
-        }
+        return filename;
+    }
+
+    public RowStats stats()
+    {
+        // We could return sstable.header.stats(), but this may not be as accurate than the actual sstable stats (see
+        // SerializationHeader.make() for details) so we use the latter instead.
+        return new RowStats(sstable.getMinTimestamp(), sstable.getMinLocalDeletionTime(), sstable.getMinTTL(), sstable.getAvgColumnSetPerRow());
     }
 
     public int compareTo(SSTableIdentityIterator o)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 9497bf3..260e137 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -26,7 +26,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
@@ -108,42 +108,36 @@ public class SSTableRewriter extends Transactional.AbstractTransactional impleme
         return writer;
     }
 
-    public RowIndexEntry append(AbstractCompactedRow row)
+    public RowIndexEntry append(UnfilteredRowIterator partition)
     {
         // we do this before appending to ensure we can resetAndTruncate() safely if the append fails
-        maybeReopenEarly(row.key);
-        RowIndexEntry index = writer.append(row);
-        if (!isOffline)
+        DecoratedKey key = partition.partitionKey();
+        maybeReopenEarly(key);
+        RowIndexEntry index = writer.append(partition);
+        if (!isOffline && index != null)
         {
-            if (index == null)
+            boolean save = false;
+            for (SSTableReader reader : transaction.originals())
             {
-                cfs.invalidateCachedRow(row.key);
-            }
-            else
-            {
-                boolean save = false;
-                for (SSTableReader reader : transaction.originals())
+                if (reader.getCachedPosition(key, false) != null)
                 {
-                    if (reader.getCachedPosition(row.key, false) != null)
-                    {
-                        save = true;
-                        break;
-                    }
+                    save = true;
+                    break;
                 }
-                if (save)
-                    cachedKeys.put(row.key, index);
             }
+            if (save)
+                cachedKeys.put(key, index);
         }
         return index;
     }
 
     // attempts to append the row, if fails resets the writer position
-    public RowIndexEntry tryAppend(AbstractCompactedRow row)
+    public RowIndexEntry tryAppend(UnfilteredRowIterator partition)
     {
         writer.mark();
         try
         {
-            return append(row);
+            return append(partition);
         }
         catch (Throwable t)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
new file mode 100644
index 0000000..1188de1
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.io.IOError;
+import java.util.Iterator;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * Utility class to handle deserializing atom from sstables.
+ *
+ * Note that this is not a full fledged UnfilteredRowIterator. It's also not closeable, it is always
+ * the job of the user to close the underlying ressources.
+ */
+public abstract class SSTableSimpleIterator extends AbstractIterator<Unfiltered> implements Iterator<Unfiltered>
+{
+    protected final CFMetaData metadata;
+    protected final DataInput in;
+    protected final SerializationHelper helper;
+
+    private SSTableSimpleIterator(CFMetaData metadata, DataInput in, SerializationHelper helper)
+    {
+        this.metadata = metadata;
+        this.in = in;
+        this.helper = helper;
+    }
+
+    public static SSTableSimpleIterator create(CFMetaData metadata, DataInput in, SerializationHeader header, SerializationHelper helper, DeletionTime partitionDeletion)
+    {
+        if (helper.version < MessagingService.VERSION_30)
+            return new OldFormatIterator(metadata, in, helper, partitionDeletion);
+        else
+            return new CurrentFormatIterator(metadata, in, header, helper);
+    }
+
+    public abstract Row readStaticRow() throws IOException;
+
+    private static class CurrentFormatIterator extends SSTableSimpleIterator
+    {
+        private final SerializationHeader header;
+
+        private final ReusableRow row;
+        private final RangeTombstoneMarker.Builder markerBuilder;
+
+        private CurrentFormatIterator(CFMetaData metadata, DataInput in, SerializationHeader header, SerializationHelper helper)
+        {
+            super(metadata, in, helper);
+            this.header = header;
+
+            int clusteringSize = metadata.comparator.size();
+            Columns regularColumns = header == null ? metadata.partitionColumns().regulars : header.columns().regulars;
+
+            this.row = new ReusableRow(clusteringSize, regularColumns, true, metadata.isCounter());
+            this.markerBuilder = new RangeTombstoneMarker.Builder(clusteringSize);
+        }
+
+        public Row readStaticRow() throws IOException
+        {
+            return header.hasStatic()
+                ? UnfilteredSerializer.serializer.deserializeStaticRow(in, header, helper)
+                : Rows.EMPTY_STATIC_ROW;
+        }
+
+        protected Unfiltered computeNext()
+        {
+            try
+            {
+                Unfiltered.Kind kind = UnfilteredSerializer.serializer.deserialize(in, header, helper, row.writer(), markerBuilder.reset());
+
+                return kind == null
+                     ? endOfData()
+                     : (kind == Unfiltered.Kind.ROW ? row : markerBuilder.build());
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+        }
+    }
+
+    private static class OldFormatIterator extends SSTableSimpleIterator
+    {
+        private final UnfilteredDeserializer deserializer;
+
+        private OldFormatIterator(CFMetaData metadata, DataInput in, SerializationHelper helper, DeletionTime partitionDeletion)
+        {
+            super(metadata, in, helper);
+            // We use an UnfilteredDeserializer because even though we don't need all it's fanciness, it happens to handle all
+            // the details we need for reading the old format.
+            this.deserializer = UnfilteredDeserializer.create(metadata, in, null, helper, partitionDeletion, false);
+        }
+
+        public Row readStaticRow() throws IOException
+        {
+            if (metadata.isCompactTable())
+            {
+                // For static compact tables, in the old format, static columns are intermingled with the other columns, so we
+                // need to extract them. Which imply 2 passes (one to extract the static, then one for other value).
+                if (metadata.isStaticCompactTable())
+                {
+                    // Because we don't support streaming from old file version, the only case we should get there is for compaction,
+                    // where the DataInput should be a file based one.
+                    assert in instanceof FileDataInput;
+                    FileDataInput file = (FileDataInput)in;
+                    FileMark mark = file.mark();
+                    Row staticRow = LegacyLayout.extractStaticColumns(metadata, file, metadata.partitionColumns().statics);
+                    file.reset(mark);
+
+                    // We've extracted the static columns, so we must ignore them on the 2nd pass
+                    ((UnfilteredDeserializer.OldFormatDeserializer)deserializer).setSkipStatic();
+                    return staticRow;
+                }
+                else
+                {
+                    return Rows.EMPTY_STATIC_ROW;
+                }
+            }
+
+            return deserializer.hasNext() && deserializer.nextIsStatic()
+                 ? (Row)deserializer.readNext()
+                 : Rows.EMPTY_STATIC_ROW;
+
+        }
+
+        protected Unfiltered computeNext()
+        {
+            try
+            {
+                if (!deserializer.hasNext())
+                    return endOfData();
+
+                Unfiltered unfiltered = deserializer.readNext();
+                if (metadata.isStaticCompactTable() && unfiltered.kind() == Unfiltered.Kind.ROW)
+                {
+                    Row row = (Row) unfiltered;
+                    ColumnDefinition def = metadata.getColumnDefinition(LegacyLayout.encodeClustering(metadata, row.clustering()));
+                    if (def != null && def.isStatic())
+                        return computeNext();
+                }
+                return unfiltered;
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 4bb75bc..a226585 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
@@ -28,30 +29,28 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.base.Throwables;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.ArrayBackedSortedColumns;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.TypeSizes;
-import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 /**
  * A SSTable writer that doesn't assume rows are in sorted order.
+ * <p>
  * This writer buffers rows in memory and then write them all in sorted order.
  * To avoid loading the entire data set in memory, the amount of rows buffered
  * is configurable. Each time the threshold is met, one SSTable will be
  * created (and the buffer be reseted).
  *
- * @see AbstractSSTableSimpleWriter
- *
- * @deprecated this class is depracted in favor of {@link CQLSSTableWriter}.
+ * @see SSTableSimpleWriter
  */
-@Deprecated
-public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
+class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
 {
     private static final Buffer SENTINEL = new Buffer();
 
@@ -62,92 +61,130 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
     private final BlockingQueue<Buffer> writeQueue = new SynchronousQueue<Buffer>();
     private final DiskWriter diskWriter = new DiskWriter();
 
-    /**
-     * Create a new buffering writer.
-     * @param directory the directory where to write the sstables
-     * @param partitioner  the partitioner
-     * @param keyspace the keyspace name
-     * @param columnFamily the column family name
-     * @param comparator the column family comparator
-     * @param subComparator the column family subComparator or null if not a Super column family.
-     * @param bufferSizeInMB the data size in MB before which a sstable is written and the buffer reseted. This correspond roughly to the written
-     * data size (i.e. the size of the create sstable). The actual size used in memory will be higher (by how much depends on the size of the
-     * columns you add). For 1GB of heap, a 128 bufferSizeInMB is probably a reasonable choice. If you experience OOM, this value should be lowered.
-     */
-    public SSTableSimpleUnsortedWriter(File directory,
-                                       IPartitioner partitioner,
-                                       String keyspace,
-                                       String columnFamily,
-                                       AbstractType<?> comparator,
-                                       AbstractType<?> subComparator,
-                                       int bufferSizeInMB,
-                                       CompressionParameters compressParameters)
+    SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns, long bufferSizeInMB)
     {
-        this(directory, CFMetaData.denseCFMetaData(keyspace, columnFamily, comparator, subComparator).compressionParameters(compressParameters), partitioner, bufferSizeInMB);
+        super(directory, metadata, partitioner, columns);
+        this.bufferSize = bufferSizeInMB * 1024L * 1024L;
+        diskWriter.start();
     }
 
-    public SSTableSimpleUnsortedWriter(File directory,
-                                       IPartitioner partitioner,
-                                       String keyspace,
-                                       String columnFamily,
-                                       AbstractType<?> comparator,
-                                       AbstractType<?> subComparator,
-                                       int bufferSizeInMB)
+    PartitionUpdate getUpdateFor(DecoratedKey key)
     {
-        this(directory, partitioner, keyspace, columnFamily, comparator, subComparator, bufferSizeInMB, new CompressionParameters(null));
-    }
+        assert key != null;
 
-    public SSTableSimpleUnsortedWriter(File directory, CFMetaData metadata, IPartitioner partitioner, long bufferSizeInMB)
-    {
-        super(directory, metadata, partitioner);
-        bufferSize = bufferSizeInMB * 1024L * 1024L;
-        diskWriter.start();
+        PartitionUpdate previous = buffer.get(key);
+        if (previous == null)
+        {
+            previous = createPartitionUpdate(key);
+            count(PartitionUpdate.serializer.serializedSize(previous, formatType.info.getLatestVersion().correspondingMessagingVersion(), TypeSizes.NATIVE));
+            previous.allowNewUpdates();
+            buffer.put(key, previous);
+        }
+        return previous;
     }
 
-    protected void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException
+    private void count(long size)
     {
-        // Nothing to do since we'll sync if needed in addColumn.
+        currentSize += size;
     }
 
-    @Override
-    protected void addColumn(Cell cell) throws IOException
+    private void countCell(ColumnDefinition column, ByteBuffer value, LivenessInfo info, CellPath path)
     {
-        super.addColumn(cell);
-        countColumn(cell);
-    }
+        // Note that the accounting of a cell is a bit inaccurate (it doesn't take some of the file format optimization into account)
+        // and the maintaining of the bufferSize is in general not perfect. This has always been the case for this class but we should
+        // improve that. In particular, what we count is closer to the serialized value, but it's debatable that it's the right thing
+        // to count since it will take a lot more space in memory and the bufferSize if first and foremost used to avoid OOM when
+        // using this writer.
 
-    protected void countColumn(Cell cell) throws IOException
-    {
-        currentSize += cell.serializedSize(metadata.comparator, TypeSizes.NATIVE);
+        count(1); // Each cell has a byte flag on disk
 
-        // We don't want to sync in writeRow() only as this might blow up the bufferSize for wide rows.
-        if (currentSize > bufferSize)
-            replaceColumnFamily();
+        if (value.hasRemaining())
+            count(column.type.writtenLength(value, TypeSizes.NATIVE));
+
+        count(8); // timestamp
+        if (info.hasLocalDeletionTime())
+            count(4);
+        if (info.hasTTL())
+            count(4);
+
+        if (path != null)
+        {
+            assert path.size() == 1;
+            count(2 + path.get(0).remaining());
+        }
     }
 
-    protected ColumnFamily getColumnFamily()
+    private void maybeSync() throws SyncException
     {
-        ColumnFamily previous = buffer.get(currentKey);
-        // If the CF already exist in memory, we'll just continue adding to it
-        if (previous == null)
+        try
         {
-            previous = createColumnFamily();
-            buffer.put(currentKey, previous);
-
-            // Since this new CF will be written by the next sync(), count its header. And a CF header
-            // on disk is:
-            //   - the row key: 2 bytes size + key size bytes
-            //   - the row level deletion infos: 4 + 8 bytes
-            currentSize += 14 + currentKey.getKey().remaining();
+            if (currentSize > bufferSize)
+                sync();
+        }
+        catch (IOException e)
+        {
+            // addColumn does not throw IOException but we want to report this to the user,
+            // so wrap it in a temporary RuntimeException that we'll catch in rawAddRow above.
+            throw new SyncException(e);
         }
-        return previous;
     }
 
-    protected ColumnFamily createColumnFamily()
+    private PartitionUpdate createPartitionUpdate(DecoratedKey key)
     {
-        return ArrayBackedSortedColumns.factory.create(metadata);
+        return new PartitionUpdate(metadata, key, columns, 4)
+        {
+            @Override
+            protected StaticWriter createStaticWriter()
+            {
+                return new StaticWriter()
+                {
+                    @Override
+                    public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path)
+                    {
+                        super.writeCell(column, isCounter, value, info, path);
+                        countCell(column, value, info, path);
+                    }
+
+                    @Override
+                    public void endOfRow()
+                    {
+                        super.endOfRow();
+                        maybeSync();
+                    }
+                };
+            }
+
+            @Override
+            protected Writer createWriter()
+            {
+                return new RegularWriter()
+                {
+                    @Override
+                    public void writeClusteringValue(ByteBuffer value)
+                    {
+                        super.writeClusteringValue(value);
+                        count(2 + value.remaining());
+                    }
+
+                    @Override
+                    public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path)
+                    {
+                        super.writeCell(column, isCounter, value, info, path);
+                        countCell(column, value, info, path);
+                    }
+
+                    @Override
+                    public void endOfRow()
+                    {
+                        super.endOfRow();
+                        maybeSync();
+                    }
+                };
+            }
+        };
     }
 
+    @Override
     public void close() throws IOException
     {
         sync();
@@ -163,23 +200,14 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
         checkForWriterException();
     }
 
-    // This is overridden by CQLSSTableWriter to hold off replacing column family until the next iteration through
-    protected void replaceColumnFamily() throws IOException
-    {
-        sync();
-    }
-
     protected void sync() throws IOException
     {
         if (buffer.isEmpty())
             return;
 
-        columnFamily = null;
         put(buffer);
         buffer = new Buffer();
         currentSize = 0;
-        columnFamily = getColumnFamily();
-        buffer.setFirstInsertedKey(currentKey);
     }
 
     private void put(Buffer buffer) throws IOException
@@ -211,56 +239,45 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
         }
     }
 
-    // typedef
-    private static class Buffer extends TreeMap<DecoratedKey, ColumnFamily> {
-        private DecoratedKey firstInsertedKey;
-
-        public void setFirstInsertedKey(DecoratedKey firstInsertedKey) {
-            this.firstInsertedKey = firstInsertedKey;
-        }
-
-        public DecoratedKey getFirstInsertedKey() {
-            return firstInsertedKey;
+    static class SyncException extends RuntimeException
+    {
+        SyncException(IOException ioe)
+        {
+            super(ioe);
         }
     }
 
+    //// typedef
+    static class Buffer extends TreeMap<DecoratedKey, PartitionUpdate> {}
+
     private class DiskWriter extends Thread
     {
         volatile Throwable exception = null;
 
         public void run()
         {
+            while (true)
             {
-                while (true)
+                try
                 {
-                    try
-                    {
-                        Buffer b = writeQueue.take();
-                        if (b == SENTINEL)
-                            return;
+                    Buffer b = writeQueue.take();
+                    if (b == SENTINEL)
+                        return;
 
-                        try (SSTableWriter writer = getWriter();)
-                        {
-                            for (Map.Entry<DecoratedKey, ColumnFamily> entry : b.entrySet())
-                            {
-                                if (entry.getValue().getColumnCount() > 0)
-                                    writer.append(entry.getKey(), entry.getValue());
-                                else if (!entry.getKey().equals(b.getFirstInsertedKey()))
-                                    throw new AssertionError("Empty partition");
-                            }
-                            
-                            writer.finish(false);
-                        }
-                    }
-                    catch (Throwable e)
+                    try (SSTableWriter writer = createWriter())
                     {
-                        JVMStabilityInspector.inspectThrowable(e);
-                        // Keep only the first exception
-                        if (exception == null)
-                            exception = e;
+                        for (Map.Entry<DecoratedKey, PartitionUpdate> entry : b.entrySet())
+                            writer.append(entry.getValue().unfilteredIterator());
+                        writer.finish(false);
                     }
                 }
-
+                catch (Throwable e)
+                {
+                    JVMStabilityInspector.inspectThrowable(e);
+                    // Keep only the first exception
+                    if (exception == null)
+                        exception = e;
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index 2601d6d..8a83242 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -18,80 +18,93 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Throwables;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.rows.RowStats;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.FSError;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.CounterId;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * A SSTable writer that assumes rows are in (partitioner) sorted order.
+ * <p>
  * Contrarily to SSTableSimpleUnsortedWriter, this writer does not buffer
  * anything into memory, however it assumes that row are added in sorted order
  * (an exception will be thrown otherwise), which for the RandomPartitioner
  * means that rows should be added by increasing md5 of the row key. This is
  * rarely possible and SSTableSimpleUnsortedWriter should most of the time be
  * prefered.
- *
- * @see AbstractSSTableSimpleWriter
- *
- * @deprecated this class is depracted in favor of {@link CQLSSTableWriter}.
  */
-@Deprecated
-public class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
+class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
 {
-    private final SSTableWriter writer;
+    protected DecoratedKey currentKey;
+    protected PartitionUpdate update;
 
-    /**
-     * Create a new writer.
-     * @param directory the directory where to write the sstable
-     * @param partitioner the partitioner
-     * @param keyspace the keyspace name
-     * @param columnFamily the column family name
-     * @param comparator the column family comparator
-     * @param subComparator the column family subComparator or null if not a Super column family.
-     */
-    public SSTableSimpleWriter(File directory,
-                               IPartitioner partitioner,
-                               String keyspace,
-                               String columnFamily,
-                               AbstractType<?> comparator,
-                               AbstractType<?> subComparator)
+    private SSTableWriter writer;
+
+    protected SSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner, PartitionColumns columns)
     {
-        this(directory, CFMetaData.denseCFMetaData(keyspace, columnFamily, comparator, subComparator), partitioner);
+        super(directory, metadata, partitioner, columns);
+    }
+
+    private SSTableWriter getOrCreateWriter()
+    {
+        if (writer == null)
+            writer = createWriter();
+
+        return writer;
     }
 
-    public SSTableSimpleWriter(File directory, CFMetaData metadata, IPartitioner partitioner)
+    PartitionUpdate getUpdateFor(DecoratedKey key) throws IOException
     {
-        super(directory, metadata, partitioner);
-        writer = getWriter();
+        assert key != null;
+
+        // If that's not the current key, write the current one if necessary and create a new
+        // update for the new key.
+        if (!key.equals(currentKey))
+        {
+            if (update != null)
+                writePartition(update);
+            currentKey = key;
+            update = new PartitionUpdate(metadata, currentKey, columns, 4);
+        }
+
+        assert update != null;
+        return update;
     }
 
     public void close()
     {
         try
         {
-            if (currentKey != null)
-                writeRow(currentKey, columnFamily);
-            writer.finish(false);
+            if (update != null)
+                writePartition(update);
+            if (writer != null)
+                writer.finish(false);
         }
         catch (Throwable t)
         {
-            throw Throwables.propagate(writer.abort(t));
+            throw Throwables.propagate(writer == null ? t : writer.abort(t));
         }
     }
 
-    protected void writeRow(DecoratedKey key, ColumnFamily columnFamily)
-    {
-        writer.append(key, columnFamily);
-    }
-
-    protected ColumnFamily getColumnFamily()
+    private void writePartition(PartitionUpdate update) throws IOException
     {
-        return ArrayBackedSortedColumns.factory.create(metadata);
+        getOrCreateWriter().append(update.unfilteredIterator());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
index ca003b6..1286f16 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFormat.java
@@ -20,11 +20,9 @@ package org.apache.cassandra.io.sstable.format;
 import com.google.common.base.CharMatcher;
 import com.google.common.collect.ImmutableList;
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.ColumnSerializer;
-import org.apache.cassandra.db.OnDiskAtom;
+import org.apache.cassandra.db.LegacyLayout;
 import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.compaction.CompactionController;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.io.util.FileDataInput;
@@ -45,11 +43,7 @@ public interface SSTableFormat
     SSTableWriter.Factory getWriterFactory();
     SSTableReader.Factory getReaderFactory();
 
-    Iterator<OnDiskAtom> getOnDiskIterator(FileDataInput in, ColumnSerializer.Flag flag, int expireBefore, CFMetaData cfm, Version version);
-
-    AbstractCompactedRow getCompactedRowWriter(CompactionController controller, ImmutableList<OnDiskAtomIterator> onDiskAtomIterators);
-
-    RowIndexEntry.IndexSerializer<?> getIndexSerializer(CFMetaData cfm);
+    RowIndexEntry.IndexSerializer<?> getIndexSerializer(CFMetaData cfm, Version version, SerializationHeader header);
 
     public static enum Type
     {
@@ -62,6 +56,7 @@ public interface SSTableFormat
 
         public final SSTableFormat info;
         public final String name;
+
         private Type(String name, SSTableFormat info)
         {
             //Since format comes right after generation


Mime
View raw message