cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xe...@apache.org
Subject cassandra git commit: Add sstable flush observer
Date Thu, 19 Nov 2015 22:19:17 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk b636b0742 -> f81a91d3f


Add sstable flush observer

patch by Pavel Yaskevich; reviewed by Sam Tunnicliffe for CASSANDRA-10678


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f81a91d3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f81a91d3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f81a91d3

Branch: refs/heads/trunk
Commit: f81a91d3fe0d1cd93f093c74356a1d7d018ed22f
Parents: b636b07
Author: Pavel Yaskevich <xedin@apache.org>
Authored: Fri Nov 6 18:38:47 2015 -0800
Committer: Pavel Yaskevich <xedin@apache.org>
Committed: Thu Nov 19 14:16:59 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |   2 +-
 .../org/apache/cassandra/db/ColumnIndex.java    |  18 +-
 .../compaction/AbstractCompactionStrategy.java  |  11 +-
 .../db/compaction/CompactionManager.java        |   2 +
 .../compaction/CompactionStrategyManager.java   |  13 +-
 .../cassandra/db/compaction/Upgrader.java       |   1 +
 .../writers/DefaultCompactionWriter.java        |   1 +
 .../writers/MajorLeveledCompactionWriter.java   |   1 +
 .../writers/MaxSSTableSizeWriter.java           |   1 +
 .../SplittingSizeTieredCompactionWriter.java    |   1 +
 src/java/org/apache/cassandra/index/Index.java  |  15 ++
 .../io/sstable/AbstractSSTableSimpleWriter.java |   4 +-
 .../cassandra/io/sstable/SSTableTxnWriter.java  |  11 +-
 .../io/sstable/SimpleSSTableMultiWriter.java    |   4 +-
 .../io/sstable/format/SSTableFlushObserver.java |  55 +++++
 .../io/sstable/format/SSTableWriter.java        |  83 +++++--
 .../io/sstable/format/big/BigFormat.java        |   9 +-
 .../io/sstable/format/big/BigTableWriter.java   |   8 +-
 .../apache/cassandra/db/RowIndexEntryTest.java  |   2 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |   2 +-
 .../db/lifecycle/RealTransactionsTest.java      |   1 +
 .../io/sstable/SSTableRewriterTest.java         |   2 +-
 .../format/SSTableFlushObserverTest.java        | 217 +++++++++++++++++++
 24 files changed, 425 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dd68a3c..d4efbcc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.2
+ * Add sstable flush observer (CASSANDRA-10678)
  * Improve NTS endpoints calculation (CASSANDRA-10200)
  * Improve performance of the folderSize function (CASSANDRA-10677)
  * Add support for type casting in selection clause (CASSANDRA-10310)
@@ -6,7 +7,6 @@
  * Abort in-progress queries that time out (CASSANDRA-7392)
  * Add transparent data encryption core classes (CASSANDRA-9945)
 
-
 3.1
 Merged from 3.0:
  * Correctly preserve deletion info on updated rows when notifying indexers

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 23cede4..08ce2dd 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -469,7 +469,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector metadataCollector, SerializationHeader header, LifecycleTransaction txn)
     {
-        return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, txn);
+        return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, indexManager.listIndexes(), txn);
     }
 
     /** call when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java
index ede3f79..749c155 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -25,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.IndexHelper;
+import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -44,11 +45,15 @@ public class ColumnIndex
         this.columnsIndex = columnsIndex;
     }
 
-    public static ColumnIndex writeAndBuildIndex(UnfilteredRowIterator iterator, SequentialWriter output, SerializationHeader header, Version version) throws IOException
+    public static ColumnIndex writeAndBuildIndex(UnfilteredRowIterator iterator,
+                                                 SequentialWriter output,
+                                                 SerializationHeader header,
+                                                 Collection<SSTableFlushObserver> observers,
+                                                 Version version) throws IOException
     {
         assert !iterator.isEmpty() && version.storeRows();
 
-        Builder builder = new Builder(iterator, output, header, version.correspondingMessagingVersion());
+        Builder builder = new Builder(iterator, output, header, observers, version.correspondingMessagingVersion());
         return builder.build();
     }
 
@@ -83,15 +88,19 @@ public class ColumnIndex
 
         private DeletionTime openMarker;
 
+        private final Collection<SSTableFlushObserver> observers;
+
         public Builder(UnfilteredRowIterator iterator,
                        SequentialWriter writer,
                        SerializationHeader header,
+                       Collection<SSTableFlushObserver> observers,
                        int version)
         {
             this.iterator = iterator;
             this.writer = writer;
             this.header = header;
             this.version = version;
+            this.observers = observers == null ? Collections.emptyList() : observers;
             this.initialPosition = writer.position();
         }
 
@@ -142,6 +151,11 @@ public class ColumnIndex
             }
 
             UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, pos - previousRowStart, version);
+
+            // notify observers about each new cell added to the row
+            if (!observers.isEmpty() && unfiltered.isRow())
+                ((Row) unfiltered).stream().forEach(cell -> observers.forEach((o) -> o.nextCell(cell)));
+
             lastClustering = unfiltered.clustering();
             previousRowStart = pos;
             ++written;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index ae8839e..cab56bb 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -28,6 +28,7 @@ import com.google.common.util.concurrent.RateLimiter;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 import org.apache.cassandra.io.sstable.SimpleSSTableMultiWriter;
@@ -509,8 +510,14 @@ public abstract class AbstractCompactionStrategy
         return groupedSSTables;
     }
 
-    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector meta, SerializationHeader header, LifecycleTransaction txn)
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
+                                                       long keyCount,
+                                                       long repairedAt,
+                                                       MetadataCollector meta,
+                                                       SerializationHeader header,
+                                                       Collection<Index> indexes,
+                                                       LifecycleTransaction txn)
     {
-        return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, txn);
+        return SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfs.metadata, meta, header, indexes, txn);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 559a2ea..02d6aa1 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -963,6 +963,7 @@ public class CompactionManager implements CompactionManagerMBean
                                     repairedAt,
                                     sstable.getSSTableLevel(),
                                     sstable.header,
+                                    cfs.indexManager.listIndexes(),
                                     txn);
     }
 
@@ -995,6 +996,7 @@ public class CompactionManager implements CompactionManagerMBean
                                     cfs.metadata,
                                     new MetadataCollector(sstables, cfs.metadata.comparator, minLevel),
                                     SerializationHeader.make(cfs.metadata, sstables),
+                                    cfs.indexManager.listIndexes(),
                                     txn);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index bd72c64..7c7e86a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -22,6 +22,7 @@ import java.util.*;
 import java.util.concurrent.Callable;
 
 import com.google.common.collect.Iterables;
+import org.apache.cassandra.index.Index;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -499,15 +500,21 @@ public class CompactionStrategyManager implements INotificationConsumer
         return Boolean.parseBoolean(params.options().get(AbstractCompactionStrategy.ONLY_PURGE_REPAIRED_TOMBSTONES));
     }
 
-    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn)
+    public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor,
+                                                       long keyCount,
+                                                       long repairedAt,
+                                                       MetadataCollector collector,
+                                                       SerializationHeader header,
+                                                       Collection<Index> indexes,
+                                                       LifecycleTransaction txn)
     {
         if (repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
         {
-            return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+            return unrepaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
         }
         else
         {
-            return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, txn);
+            return repaired.createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, indexes, txn);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index fcd1a3c..3f0f9a3 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -75,6 +75,7 @@ public class Upgrader
                                     cfs.metadata,
                                     sstableMetadataCollector,
                                     SerializationHeader.make(cfs.metadata, Sets.newHashSet(sstable)),
+                                    cfs.indexManager.listIndexes(),
                                     transaction);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index 8b90224..8b4351f 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -67,6 +67,7 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
                                                     cfs.metadata,
                                                     new MetadataCollector(txn.originals(), cfs.metadata.comparator, 0),
                                                     SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+                                                    cfs.indexManager.listIndexes(),
                                                     txn);
         sstableWriter.switchWriter(writer);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index 6d191f8..b0c4562 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -106,6 +106,7 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
                                                     cfs.metadata,
                                                     new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel),
                                                     SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+                                                    cfs.indexManager.listIndexes(),
                                                     txn);
         sstableWriter.switchWriter(writer);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index 142fe87..1dc72e7 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -87,6 +87,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
                                                     cfs.metadata,
                                                     new MetadataCollector(allSSTables, cfs.metadata.comparator, level),
                                                     SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+                                                    cfs.indexManager.listIndexes(),
                                                     txn);
 
         sstableWriter.switchWriter(writer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index 796391c..3a7f526 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -111,6 +111,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
                                                     cfs.metadata,
                                                     new MetadataCollector(allSSTables, cfs.metadata.comparator, 0),
                                                     SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+                                                    cfs.indexManager.listIndexes(),
                                                     txn);
         logger.trace("Switching writer, currentPartitionsToWrite = {}", currentPartitionsToWrite);
         sstableWriter.switchWriter(writer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/index/Index.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java
index 8655044..7bca924 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -7,6 +7,7 @@ import java.util.function.BiFunction;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.partitions.PartitionIterator;
@@ -15,6 +16,8 @@ import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.index.transactions.IndexTransaction;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
@@ -185,6 +188,18 @@ public interface Index
      */
     public boolean shouldBuildBlocking();
 
+    /**
+     * Get flush observer to observe partition/cell events generated by flushing SSTable (memtable flush or compaction).
+     *
+     * @param descriptor The descriptor of the sstable observer is requested for.
+     * @param opType The type of the operation which requests observer e.g. memtable flush or compaction.
+     *
+     * @return SSTable flush observer.
+     */
+    default SSTableFlushObserver getFlushObserver(Descriptor descriptor, OperationType opType)
+    {
+        return null;
+    }
 
     /*
      * Index selection

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 62348ec..0213fd5 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -22,6 +22,7 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.Closeable;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -65,7 +66,8 @@ abstract class AbstractSSTableSimpleWriter implements Closeable
                                        0,
                                        ActiveRepairService.UNREPAIRED_SSTABLE,
                                        0,
-                                       new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS));
+                                       new SerializationHeader(true, metadata, columns, EncodingStats.NO_STATS),
+                                       Collections.emptySet());
     }
 
     private static Descriptor createDescriptor(File directory, final String keyspace, final String columnFamily, final SSTableFormat.Type fmt)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
index e889d85..5286ac5 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.concurrent.Transactional;
@@ -102,12 +103,18 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem
     }
 
     @SuppressWarnings("resource") // log and writer closed during postCleanup
-    public static SSTableTxnWriter create(CFMetaData cfm, Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header)
+    public static SSTableTxnWriter create(CFMetaData cfm,
+                                          Descriptor descriptor,
+                                          long keyCount,
+                                          long repairedAt,
+                                          int sstableLevel,
+                                          SerializationHeader header,
+                                          Collection<Index> indexes)
     {
         // if the column family store does not exist, we create a new default SSTableMultiWriter to use:
         LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE);
         MetadataCollector collector = new MetadataCollector(cfm.comparator).sstableLevel(sstableLevel);
-        SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfm, collector, header, txn);
+        SSTableMultiWriter writer = SimpleSSTableMultiWriter.create(descriptor, keyCount, repairedAt, cfm, collector, header, indexes, txn);
         return new SSTableTxnWriter(txn, writer);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
index fd1b9a7..68dbd74 100644
--- a/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SimpleSSTableMultiWriter.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
@@ -109,9 +110,10 @@ public class SimpleSSTableMultiWriter implements SSTableMultiWriter
                                             CFMetaData cfm,
                                             MetadataCollector metadataCollector,
                                             SerializationHeader header,
+                                            Collection<Index> indexes,
                                             LifecycleTransaction txn)
     {
-        SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, txn);
+        SSTableWriter writer = SSTableWriter.create(descriptor, keyCount, repairedAt, cfm, metadataCollector, header, indexes, txn);
         return new SimpleSSTableMultiWriter(writer);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java
new file mode 100644
index 0000000..d6f54e2
--- /dev/null
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableFlushObserver.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.rows.ColumnData;
+
+/**
+ * Observer for events in the lifecycle of writing out an sstable.
+ */
+public interface SSTableFlushObserver
+{
+    /**
+     * Called before writing any data to the sstable.
+     */
+    void begin();
+
+    /**
+     * Called when a new partition in being written to the sstable,
+     * but before any cells are processed (see {@link #nextCell(ColumnData)}).
+     *
+     * @param key The key being appended to SSTable.
+     * @param indexPosition The position of the key in the SSTable PRIMARY_INDEX file.
+     */
+    void startPartition(DecoratedKey key, long indexPosition);
+
+    /**
+     * Called after the cell is written to the sstable.
+     * Will be preceded by a call to {@code startPartition(DecoratedKey, long)},
+     * and the cell should be assumed to belong to that row.
+     *
+     * @param cell The cell being added to the row.
+     */
+    void nextCell(ColumnData cell);
+
+    /**
+     * Called when all data is written to the file and it's ready to be finished up.
+     */
+    void complete();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 4cbbd70..3203964 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -18,20 +18,21 @@
 
 package org.apache.cassandra.io.sstable.format;
 
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
@@ -58,6 +59,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
     protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
     protected final SerializationHeader header;
     protected final TransactionalProxy txnProxy = txnProxy();
+    protected final Collection<SSTableFlushObserver> observers;
 
     protected abstract TransactionalProxy txnProxy();
 
@@ -69,12 +71,13 @@ public abstract class SSTableWriter extends SSTable implements Transactional
         protected boolean openResult;
     }
 
-    protected SSTableWriter(Descriptor descriptor, 
-                            long keyCount, 
-                            long repairedAt, 
-                            CFMetaData metadata, 
-                            MetadataCollector metadataCollector, 
-                            SerializationHeader header)
+    protected SSTableWriter(Descriptor descriptor,
+                            long keyCount,
+                            long repairedAt,
+                            CFMetaData metadata,
+                            MetadataCollector metadataCollector,
+                            SerializationHeader header,
+                            Collection<SSTableFlushObserver> observers)
     {
         super(descriptor, components(metadata), metadata);
         this.keyCount = keyCount;
@@ -82,6 +85,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
         this.metadataCollector = metadataCollector;
         this.header = header;
         this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata, descriptor.version, header);
+        this.observers = observers == null ? Collections.emptySet() : observers;
     }
 
     public static SSTableWriter create(Descriptor descriptor,
@@ -90,16 +94,23 @@ public abstract class SSTableWriter extends SSTable implements Transactional
                                        CFMetaData metadata,
                                        MetadataCollector metadataCollector,
                                        SerializationHeader header,
+                                       Collection<Index> indexes,
                                        LifecycleTransaction txn)
     {
         Factory writerFactory = descriptor.getFormat().getWriterFactory();
-        return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn);
+        return writerFactory.open(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers(descriptor, indexes, txn.opType()), txn);
     }
 
-    public static SSTableWriter create(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleTransaction txn)
+    public static SSTableWriter create(Descriptor descriptor,
+                                       long keyCount,
+                                       long repairedAt,
+                                       int sstableLevel,
+                                       SerializationHeader header,
+                                       Collection<Index> indexes,
+                                       LifecycleTransaction txn)
     {
         CFMetaData metadata = Schema.instance.getCFMetaData(descriptor);
-        return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, txn);
+        return create(metadata, descriptor, keyCount, repairedAt, sstableLevel, header, indexes, txn);
     }
 
     public static SSTableWriter create(CFMetaData metadata,
@@ -108,21 +119,34 @@ public abstract class SSTableWriter extends SSTable implements Transactional
                                        long repairedAt,
                                        int sstableLevel,
                                        SerializationHeader header,
+                                       Collection<Index> indexes,
                                        LifecycleTransaction txn)
     {
         MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel);
-        return create(descriptor, keyCount, repairedAt, metadata, collector, header, txn);
+        return create(descriptor, keyCount, repairedAt, metadata, collector, header, indexes, txn);
     }
 
-    public static SSTableWriter create(String filename, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header,LifecycleTransaction txn)
+    public static SSTableWriter create(String filename,
+                                       long keyCount,
+                                       long repairedAt,
+                                       int sstableLevel,
+                                       SerializationHeader header,
+                                       Collection<Index> indexes,
+                                       LifecycleTransaction txn)
     {
-        return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, txn);
+        return create(Descriptor.fromFilename(filename), keyCount, repairedAt, sstableLevel, header, indexes, txn);
     }
 
     @VisibleForTesting
-    public static SSTableWriter create(String filename, long keyCount, long repairedAt, SerializationHeader header, LifecycleTransaction txn)
+    public static SSTableWriter create(String filename,
+                                       long keyCount,
+                                       long repairedAt,
+                                       SerializationHeader header,
+                                       Collection<Index> indexes,
+                                       LifecycleTransaction txn)
     {
-        return create(Descriptor.fromFilename(filename), keyCount, repairedAt, 0, header, txn);
+        Descriptor descriptor = Descriptor.fromFilename(filename);
+        return create(descriptor, keyCount, repairedAt, 0, header, indexes, txn);
     }
 
     private static Set<Component> components(CFMetaData metadata)
@@ -150,6 +174,27 @@ public abstract class SSTableWriter extends SSTable implements Transactional
         return components;
     }
 
+    private static Collection<SSTableFlushObserver> observers(Descriptor descriptor,
+                                                              Collection<Index> indexes,
+                                                              OperationType operationType)
+    {
+        if (indexes == null)
+            return Collections.emptyList();
+
+        List<SSTableFlushObserver> observers = new ArrayList<>(indexes.size());
+        for (Index index : indexes)
+        {
+            SSTableFlushObserver observer = index.getFlushObserver(descriptor, operationType);
+            if (observer != null)
+            {
+                observer.begin();
+                observers.add(observer);
+            }
+        }
+
+        return ImmutableList.copyOf(observers);
+    }
+
     public abstract void mark();
 
     /**
@@ -211,6 +256,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
     {
         setOpenResult(openResult);
         txnProxy.finish();
+        observers.forEach(SSTableFlushObserver::complete);
         return finished();
     }
 
@@ -285,6 +331,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional
                                            CFMetaData metadata,
                                            MetadataCollector metadataCollector,
                                            SerializationHeader header,
+                                           Collection<SSTableFlushObserver> observers,
                                            LifecycleTransaction txn);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index 1f2a98f..e030b5b 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.io.sstable.format.big;
 
+import java.util.Collection;
 import java.util.Set;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -25,10 +26,7 @@ import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.format.SSTableFormat;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
-import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.format.*;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.net.MessagingService;
@@ -88,9 +86,10 @@ public class BigFormat implements SSTableFormat
                                   CFMetaData metadata,
                                   MetadataCollector metadataCollector,
                                   SerializationHeader header,
+                                  Collection<SSTableFlushObserver> observers,
                                   LifecycleTransaction txn)
         {
-            return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, txn);
+            return new BigTableWriter(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers, txn);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index b6077e0..5fe9147 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -18,12 +18,14 @@
 package org.apache.cassandra.io.sstable.format.big;
 
 import java.io.*;
+import java.util.Collection;
 import java.util.Map;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 
@@ -63,9 +65,10 @@ public class BigTableWriter extends SSTableWriter
                           CFMetaData metadata, 
                           MetadataCollector metadataCollector, 
                           SerializationHeader header,
+                          Collection<SSTableFlushObserver> observers,
                           LifecycleTransaction txn)
     {
-        super(descriptor, keyCount, repairedAt, metadata, metadataCollector, header);
+        super(descriptor, keyCount, repairedAt, metadata, metadataCollector, header, observers);
         txn.trackNew(this); // must track before any files are created
 
         if (compression)
@@ -143,10 +146,11 @@ public class BigTableWriter extends SSTableWriter
             return null;
 
         long startPosition = beforeAppend(key);
+        observers.forEach((o) -> o.startPartition(key, iwriter.indexFile.position()));
 
         try (UnfilteredRowIterator collecting = Transformation.apply(iterator, new StatsCollector(metadataCollector)))
         {
-            ColumnIndex index = ColumnIndex.writeAndBuildIndex(collecting, dataFile, header, descriptor.version);
+            ColumnIndex index = ColumnIndex.writeAndBuildIndex(collecting, dataFile, header, observers, descriptor.version);
 
             RowIndexEntry entry = RowIndexEntry.create(startPosition, collecting.partitionLevelDeletion(), index);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
index 62c88a0..0c7ee59 100644
--- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
@@ -135,7 +135,7 @@ public class RowIndexEntryTest extends CQLTester
         File tempFile = File.createTempFile("row_index_entry_test", null);
         tempFile.deleteOnExit();
         SequentialWriter writer = SequentialWriter.open(tempFile);
-        ColumnIndex columnIndex = ColumnIndex.writeAndBuildIndex(partition.unfilteredIterator(), writer, header, BigFormat.latestVersion);
+        ColumnIndex columnIndex = ColumnIndex.writeAndBuildIndex(partition.unfilteredIterator(), writer, header, Collections.emptySet(), BigFormat.latestVersion);
         RowIndexEntry<IndexHelper.IndexInfo> withIndex = RowIndexEntry.create(0xdeadbeef, DeletionTime.LIVE, columnIndex);
         IndexHelper.IndexInfo.Serializer indexSerializer = new IndexHelper.IndexInfo.Serializer(cfs.metadata, BigFormat.latestVersion, header);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index d5baec8..27b774d 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -652,7 +652,7 @@ public class ScrubTest
         TestWriter(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata,
                    MetadataCollector collector, SerializationHeader header, LifecycleTransaction txn)
         {
-            super(descriptor, keyCount, repairedAt, metadata, collector, header, txn);
+            super(descriptor, keyCount, repairedAt, metadata, collector, header, Collections.emptySet(), txn);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
index 4fbbb36..bab9c90 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
@@ -168,6 +168,7 @@ public class RealTransactionsTest extends SchemaLoader
                                                            0,
                                                            0,
                                                            SerializationHeader.make(cfs.metadata, txn.originals()),
+                                                           cfs.indexManager.listIndexes(),
                                                            txn));
                 while (ci.hasNext())
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index bfe7b08..de9b357 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -1011,7 +1011,7 @@ public class SSTableRewriterTest extends SchemaLoader
     public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn)
     {
         String filename = cfs.getSSTablePath(directory);
-        return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn);
+        return SSTableWriter.create(filename, 0, 0, new SerializationHeader(true, cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), cfs.indexManager.listIndexes(), txn);
     }
 
     public static ByteBuffer random(int i, int size)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f81a91d3/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java b/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
new file mode 100644
index 0000000..29ad387
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable.format;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.big.BigTableWriter;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+public class SSTableFlushObserverTest
+{
+    private static final String KS_NAME = "test";
+    private static final String CF_NAME = "flush_observer";
+
+    @Test
+    public void testFlushObserver()
+    {
+        CFMetaData cfm = CFMetaData.Builder.create(KS_NAME, CF_NAME)
+                                           .addPartitionKey("id", UTF8Type.instance)
+                                           .addRegularColumn("first_name", UTF8Type.instance)
+                                           .addRegularColumn("age", Int32Type.instance)
+                                           .addRegularColumn("height", LongType.instance)
+                                           .build();
+
+        LifecycleTransaction transaction = LifecycleTransaction.offline(OperationType.COMPACTION);
+        FlushObserver observer = new FlushObserver();
+
+        String sstableDirectory = DatabaseDescriptor.getAllDataFileLocations()[0];
+        File directory = new File(sstableDirectory + File.pathSeparator + KS_NAME + File.pathSeparator + CF_NAME);
+        directory.deleteOnExit();
+
+        if (!directory.exists() && !directory.mkdirs())
+            throw new FSWriteError(new IOException("failed to create tmp directory"), directory.getAbsolutePath());
+
+        SSTableFormat.Type sstableFormat = DatabaseDescriptor.getSSTableFormat();
+
+        BigTableWriter writer = new BigTableWriter(new Descriptor(sstableFormat.info.getLatestVersion().version,
+                                                                  directory,
+                                                                  KS_NAME, CF_NAME,
+                                                                  0,
+                                                                  sstableFormat),
+                                                   10L, 0L, cfm,
+                                                   new MetadataCollector(cfm.comparator).sstableLevel(0),
+                                                   new SerializationHeader(true, cfm, cfm.partitionColumns(), EncodingStats.NO_STATS),
+                                                   Collections.singletonList(observer),
+                                                   transaction);
+
+        SSTableReader reader = null;
+        Multimap<ByteBuffer, Cell> expected = ArrayListMultimap.create();
+
+        try
+        {
+            final long now = System.currentTimeMillis();
+
+            ByteBuffer key = UTF8Type.instance.fromString("key1");
+            expected.putAll(key, Arrays.asList(BufferCell.live(cfm, getColumn(cfm, "first_name"), now,UTF8Type.instance.fromString("jack")),
+                                               BufferCell.live(cfm, getColumn(cfm, "age"), now, Int32Type.instance.decompose(27)),
+                                               BufferCell.live(cfm, getColumn(cfm, "height"), now, LongType.instance.decompose(183L))));
+
+            writer.append(new RowIterator(cfm, key.duplicate(), Collections.singletonList(buildRow(expected.get(key)))));
+
+            key = UTF8Type.instance.fromString("key2");
+            expected.putAll(key, Arrays.asList(BufferCell.live(cfm, getColumn(cfm, "first_name"), now,UTF8Type.instance.fromString("jim")),
+                                               BufferCell.live(cfm, getColumn(cfm, "age"), now, Int32Type.instance.decompose(30)),
+                                               BufferCell.live(cfm, getColumn(cfm, "height"), now, LongType.instance.decompose(180L))));
+
+            writer.append(new RowIterator(cfm, key, Collections.singletonList(buildRow(expected.get(key)))));
+
+            key = UTF8Type.instance.fromString("key3");
+            expected.putAll(key, Arrays.asList(BufferCell.live(cfm, getColumn(cfm, "first_name"), now,UTF8Type.instance.fromString("ken")),
+                                               BufferCell.live(cfm, getColumn(cfm, "age"), now, Int32Type.instance.decompose(30)),
+                                               BufferCell.live(cfm, getColumn(cfm, "height"), now, LongType.instance.decompose(178L))));
+
+            writer.append(new RowIterator(cfm, key, Collections.singletonList(buildRow(expected.get(key)))));
+
+            reader = writer.finish(true);
+        }
+        finally
+        {
+            FileUtils.closeQuietly(writer);
+        }
+
+        Assert.assertTrue(observer.isComplete);
+        Assert.assertEquals(expected.size(), observer.rows.size());
+
+        for (Pair<ByteBuffer, Long> e : observer.rows.keySet())
+        {
+            ByteBuffer key = e.left;
+            Long indexPosition = e.right;
+
+            try (FileDataInput index = reader.ifile.createReader(indexPosition))
+            {
+                ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(index);
+                Assert.assertEquals(0, UTF8Type.instance.compare(key, indexKey));
+            }
+            catch (IOException ex)
+            {
+                throw new FSReadError(ex, reader.getIndexFilename());
+            }
+
+            Assert.assertEquals(expected.get(key), observer.rows.get(e));
+        }
+    }
+
+    private static class RowIterator extends AbstractUnfilteredRowIterator
+    {
+        private final Iterator<Unfiltered> rows;
+
+        public RowIterator(CFMetaData cfm, ByteBuffer key, Collection<Unfiltered> content)
+        {
+            super(cfm,
+                  DatabaseDescriptor.getPartitioner().decorateKey(key),
+                  DeletionTime.LIVE,
+                  cfm.partitionColumns(),
+                  BTreeRow.emptyRow(Clustering.STATIC_CLUSTERING),
+                  false,
+                  EncodingStats.NO_STATS);
+
+            rows = content.iterator();
+        }
+
+        @Override
+        protected Unfiltered computeNext()
+        {
+            return rows.hasNext() ? rows.next() : endOfData();
+        }
+    }
+
+    private static class FlushObserver implements SSTableFlushObserver
+    {
+        private final Multimap<Pair<ByteBuffer, Long>, Cell> rows = ArrayListMultimap.create();
+        private Pair<ByteBuffer, Long> currentKey;
+        private boolean isComplete;
+
+        @Override
+        public void begin()
+        {}
+
+        @Override
+        public void startPartition(DecoratedKey key, long indexPosition)
+        {
+            currentKey = Pair.create(key.getKey(), indexPosition);
+        }
+
+        @Override
+        public void nextCell(ColumnData cell)
+        {
+            rows.put(currentKey, (Cell) cell);
+        }
+
+        @Override
+        public void complete()
+        {
+            isComplete = true;
+        }
+    }
+
+    private static Row buildRow(Collection<Cell> cells)
+    {
+        Row.Builder rowBuilder = BTreeRow.sortedBuilder();
+        rowBuilder.newRow(Clustering.EMPTY);
+        cells.forEach(rowBuilder::addCell);
+        return rowBuilder.build();
+    }
+
+    private static ColumnDefinition getColumn(CFMetaData cfm, String name)
+    {
+        return cfm.getColumnDefinition(UTF8Type.instance.fromString(name));
+    }
+}


Mime
View raw message