cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [1/2] git commit: Track presence of legacy counter shards in sstables
Date Fri, 04 Apr 2014 14:41:01 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk f4e8fc3f6 -> 0015f37a3


Track presence of legacy counter shards in sstables

patch by Aleksey Yeschenko; reviewed by Marcus Eriksson for
CASSANDRA-6888


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/57b18e60
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/57b18e60
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/57b18e60

Branch: refs/heads/trunk
Commit: 57b18e600c6d79d19d29f3569b81cb946ef9ee57
Parents: 6d901f9
Author: Aleksey Yeschenko <aleksey@apache.org>
Authored: Fri Apr 4 17:36:15 2014 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Fri Apr 4 17:36:15 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/ColumnFamily.java   | 12 ++-
 .../org/apache/cassandra/db/CounterCell.java    |  5 ++
 .../db/compaction/LazilyCompactedRow.java       | 12 +--
 .../cassandra/db/context/CounterContext.java    | 18 +++++
 .../cassandra/io/sstable/ColumnStats.java       | 12 ++-
 .../apache/cassandra/io/sstable/Descriptor.java |  3 +
 .../cassandra/io/sstable/SSTableWriter.java     | 26 ++++---
 .../metadata/LegacyMetadataSerializer.java      |  1 +
 .../io/sstable/metadata/MetadataCollector.java  | 67 ++++++++++-------
 .../io/sstable/metadata/StatsMetadata.java      | 14 ++++
 .../io/sstable/SSTableMetadataTest.java         | 77 +++++++++++++++++---
 12 files changed, 194 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ac2f624..4cfc957 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -40,6 +40,7 @@
  * Optimize CounterColumn#reconcile() (CASSANDRA-6953)
  * Properly remove 1.2 sstable support in 2.1 (CASSANDRA-6869)
  * Lock counter cells, not partitions (CASSANDRA-6880)
+ * Track presence of legacy counter shards in sstables (CASSANDRA-6888)
 Merged from 2.0:
  * Allow compaction of system tables during startup (CASSANDRA-6913)
  * Restrict Windows to parallel repairs (CASSANDRA-6907)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index e7aab37..da404b0 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -402,6 +402,7 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
         int maxLocalDeletionTime = Integer.MIN_VALUE;
         List<ByteBuffer> minColumnNamesSeen = Collections.emptyList();
         List<ByteBuffer> maxColumnNamesSeen = Collections.emptyList();
+        boolean hasLegacyCounterShards = false;
         for (Cell cell : this)
         {
             if (deletionInfo().getTopLevelDeletion().localDeletionTime < Integer.MAX_VALUE)
@@ -420,8 +421,17 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
                 tombstones.update(deletionTime);
             minColumnNamesSeen = ColumnNameHelper.minComponents(minColumnNamesSeen, cell.name,
metadata.comparator);
             maxColumnNamesSeen = ColumnNameHelper.maxComponents(maxColumnNamesSeen, cell.name,
metadata.comparator);
+            if (cell instanceof CounterCell)
+                hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) cell).hasLegacyShards();
         }
-        return new ColumnStats(getColumnCount(), minTimestampSeen, maxTimestampSeen, maxLocalDeletionTime,
tombstones, minColumnNamesSeen, maxColumnNamesSeen);
+        return new ColumnStats(getColumnCount(),
+                               minTimestampSeen,
+                               maxTimestampSeen,
+                               maxLocalDeletionTime,
+                               tombstones,
+                               minColumnNamesSeen,
+                               maxColumnNamesSeen,
+                               hasLegacyCounterShards);
     }
 
     public boolean isMarkedForDelete()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/db/CounterCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterCell.java b/src/java/org/apache/cassandra/db/CounterCell.java
index 6b588ef..fc4ac3f 100644
--- a/src/java/org/apache/cassandra/db/CounterCell.java
+++ b/src/java/org/apache/cassandra/db/CounterCell.java
@@ -182,6 +182,11 @@ public class CounterCell extends Cell
                                    Math.max(timestampOfLastDelete(), ((CounterCell) cell).timestampOfLastDelete()));
     }
 
+    public boolean hasLegacyShards()
+    {
+        return contextManager.hasLegacyShards(value);
+    }
+
     @Override
     public boolean equals(Object o)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 12a9308..2fefe0d 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db.compaction;
 
-import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
@@ -125,8 +124,8 @@ public class LazilyCompactedRow extends AbstractCompactedRow
                                       reducer.maxLocalDeletionTimeSeen,
                                       reducer.tombstones,
                                       reducer.minColumnNameSeen,
-                                      reducer.maxColumnNameSeen
-        );
+                                      reducer.maxColumnNameSeen,
+                                      reducer.hasLegacyCounterShards);
 
         // in case no columns were ever written, we may still need to write an empty header
with a top-level tombstone
         indexBuilder.maybeWriteEmptyRowHeader();
@@ -202,6 +201,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow
         StreamingHistogram tombstones = new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE);
         List<ByteBuffer> minColumnNameSeen = Collections.emptyList();
         List<ByteBuffer> maxColumnNameSeen = Collections.emptyList();
+        boolean hasLegacyCounterShards = false;
 
         /**
          * Called once per version of a cell that we need to merge, after which getReduced()
is called.  In other words,
@@ -293,9 +293,11 @@ public class LazilyCompactedRow extends AbstractCompactedRow
 
                 int deletionTime = reduced.getLocalDeletionTime();
                 if (deletionTime < Integer.MAX_VALUE)
-                {
                     tombstones.update(deletionTime);
-                }
+
+                if (reduced instanceof CounterCell)
+                    hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) reduced).hasLegacyShards();
+
                 return reduced;
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/db/context/CounterContext.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/context/CounterContext.java b/src/java/org/apache/cassandra/db/context/CounterContext.java
index 0b1677b..455ffc7 100644
--- a/src/java/org/apache/cassandra/db/context/CounterContext.java
+++ b/src/java/org/apache/cassandra/db/context/CounterContext.java
@@ -546,6 +546,24 @@ public class CounterContext
     }
 
     /**
+     * Detects whether or not the context has any legacy (local or remote) shards in it.
+     */
+    public boolean hasLegacyShards(ByteBuffer context)
+    {
+        int totalCount = (context.remaining() - headerLength(context)) / STEP_LENGTH;
+        int localAndGlobalCount = Math.abs(context.getShort(context.position()));
+
+        if (localAndGlobalCount < totalCount)
+            return true; // remote shard(s) present
+
+        for (int i = 0; i < localAndGlobalCount; i++)
+            if (context.getShort(context.position() + HEADER_SIZE_LENGTH + i * HEADER_ELT_LENGTH)
>= 0)
+                return true; // found a local shard
+
+        return false;
+    }
+
+    /**
      * Mark context to delete local references afterward.
      * Marking is done by multiply #elt by -1 to preserve header length
      * and #elt count in order to clear all local refs later.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ColumnStats.java b/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
index bd3bd1c..d09f965 100644
--- a/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
+++ b/src/java/org/apache/cassandra/io/sstable/ColumnStats.java
@@ -41,7 +41,16 @@ public class ColumnStats
     public final List<ByteBuffer> minColumnNames;
     public final List<ByteBuffer> maxColumnNames;
 
-    public ColumnStats(int columnCount, long minTimestamp, long maxTimestamp, int maxLocalDeletionTime,
StreamingHistogram tombstoneHistogram, List<ByteBuffer> minColumnNames, List<ByteBuffer>
maxColumnNames)
+    public final boolean hasLegacyCounterShards;
+
+    public ColumnStats(int columnCount,
+                       long minTimestamp,
+                       long maxTimestamp,
+                       int maxLocalDeletionTime,
+                       StreamingHistogram tombstoneHistogram,
+                       List<ByteBuffer> minColumnNames,
+                       List<ByteBuffer> maxColumnNames,
+                       boolean hasLegacyCounterShards)
     {
         this.minTimestamp = minTimestamp;
         this.maxTimestamp = maxTimestamp;
@@ -50,5 +59,6 @@ public class ColumnStats
         this.tombstoneHistogram = tombstoneHistogram;
         this.minColumnNames = minColumnNames;
         this.maxColumnNames = maxColumnNames;
+        this.hasLegacyCounterShards = hasLegacyCounterShards;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/io/sstable/Descriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
index db6f13a..18609bf 100644
--- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java
+++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java
@@ -62,6 +62,7 @@ public class Descriptor
         // ka (2.1.0): new Statistics.db file format
         //             index summaries can be downsampled and the sampling level is persisted
         //             switch uncompressed checksums to adler32
+        //             tracks presense of legacy (local and remote) counter shards
 
         public static final Version CURRENT = new Version(current_version);
 
@@ -73,6 +74,7 @@ public class Descriptor
         public final boolean newStatsFile;
         public final boolean hasAllAdlerChecksums;
         public final boolean hasRepairedAt;
+        public final boolean tracksLegacyCounterShards;
 
         public Version(String version)
         {
@@ -83,6 +85,7 @@ public class Descriptor
             newStatsFile = version.compareTo("ka") >= 0;
             hasAllAdlerChecksums = version.compareTo("ka") >= 0;
             hasRepairedAt = version.compareTo("ka") >= 0;
+            tracksLegacyCounterShards = version.compareTo("ka") >= 0;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 1dc2c98..4a7729e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@ -227,8 +227,9 @@ public class SSTableWriter extends SSTable
         List<ByteBuffer> minColumnNames = Collections.emptyList();
         List<ByteBuffer> maxColumnNames = Collections.emptyList();
         StreamingHistogram tombstones = new StreamingHistogram(TOMBSTONE_HISTOGRAM_BIN_SIZE);
-        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
+        boolean hasLegacyCounterShards = false;
 
+        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata);
         cf.delete(DeletionTime.serializer.deserialize(in));
 
         ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.key, dataFile.stream);
@@ -253,14 +254,16 @@ public class SSTableWriter extends SSTable
                 OnDiskAtom atom = iter.next();
                 if (atom == null)
                     break;
+
                 if (atom instanceof CounterCell)
+                {
                     atom = ((CounterCell) atom).markLocalToBeCleared();
+                    hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) atom).hasLegacyShards();
+                }
 
                 int deletionTime = atom.getLocalDeletionTime();
                 if (deletionTime < Integer.MAX_VALUE)
-                {
                     tombstones.update(deletionTime);
-                }
                 minTimestamp = Math.min(minTimestamp, atom.timestamp());
                 maxTimestamp = Math.max(maxTimestamp, atom.timestamp());
                 minColumnNames = ColumnNameHelper.minComponents(minColumnNames, atom.name(),
metadata.comparator);
@@ -278,14 +281,15 @@ public class SSTableWriter extends SSTable
             throw new FSWriteError(e, dataFile.getPath());
         }
 
-        sstableMetadataCollector.updateMinTimestamp(minTimestamp);
-        sstableMetadataCollector.updateMaxTimestamp(maxTimestamp);
-        sstableMetadataCollector.updateMaxLocalDeletionTime(maxLocalDeletionTime);
-        sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - currentPosition);
-        sstableMetadataCollector.addColumnCount(columnIndexer.writtenAtomCount());
-        sstableMetadataCollector.mergeTombstoneHistogram(tombstones);
-        sstableMetadataCollector.updateMinColumnNames(minColumnNames);
-        sstableMetadataCollector.updateMaxColumnNames(maxColumnNames);
+        sstableMetadataCollector.updateMinTimestamp(minTimestamp)
+                                .updateMaxTimestamp(maxTimestamp)
+                                .updateMaxLocalDeletionTime(maxLocalDeletionTime)
+                                .addRowSize(dataFile.getFilePointer() - currentPosition)
+                                .addColumnCount(columnIndexer.writtenAtomCount())
+                                .mergeTombstoneHistogram(tombstones)
+                                .updateMinColumnNames(minColumnNames)
+                                .updateMaxColumnNames(maxColumnNames)
+                                .updateHasLegacyCounterShards(hasLegacyCounterShards);
         afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, cf.deletionInfo().getTopLevelDeletion(),
columnIndexer.build()));
         return currentPosition;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
index 59f7be5..4bd060e 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/LegacyMetadataSerializer.java
@@ -133,6 +133,7 @@ public class LegacyMetadataSerializer extends MetadataSerializer
                                                      sstableLevel,
                                                      minColumnNames,
                                                      maxColumnNames,
+                                                     true,
                                                      ActiveRepairService.UNREPAIRED_SSTABLE));
                 if (types.contains(MetadataType.COMPACTION))
                     components.put(MetadataType.COMPACTION,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 84c35c7..84789a6 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -21,10 +21,10 @@ import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
-import com.clearspring.analytics.stream.cardinality.ICardinality;
 import com.google.common.collect.Maps;
 
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.io.sstable.*;
@@ -67,6 +67,7 @@ public class MetadataCollector
                                  0,
                                  Collections.<ByteBuffer>emptyList(),
                                  Collections.<ByteBuffer>emptyList(),
+                                 true,
                                  ActiveRepairService.UNREPAIRED_SSTABLE);
     }
 
@@ -82,6 +83,8 @@ public class MetadataCollector
     protected int sstableLevel;
     protected List<ByteBuffer> minColumnNames = Collections.emptyList();
     protected List<ByteBuffer> maxColumnNames = Collections.emptyList();
+    protected boolean hasLegacyCounterShards = false;
+
     /**
      * Default cardinality estimation method is to use HyperLogLog++.
      * Parameter here(p=13, sp=25) should give reasonable estimation
@@ -108,56 +111,62 @@ public class MetadataCollector
         {
             addAncestor(sstable.descriptor.generation);
             for (Integer i : sstable.getAncestors())
-            {
                 if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
                     addAncestor(i);
-            }
         }
     }
 
-    public void addKey(ByteBuffer key)
+    public MetadataCollector addKey(ByteBuffer key)
     {
         long hashed = MurmurHash.hash2_64(key, key.position(), key.remaining(), 0);
         cardinality.offerHashed(hashed);
+        return this;
     }
 
-    public void addRowSize(long rowSize)
+    public MetadataCollector addRowSize(long rowSize)
     {
         estimatedRowSize.add(rowSize);
+        return this;
     }
 
-    public void addColumnCount(long columnCount)
+    public MetadataCollector addColumnCount(long columnCount)
     {
         estimatedColumnCount.add(columnCount);
+        return this;
     }
 
-    public void mergeTombstoneHistogram(StreamingHistogram histogram)
+    public MetadataCollector mergeTombstoneHistogram(StreamingHistogram histogram)
     {
         estimatedTombstoneDropTime.merge(histogram);
+        return this;
     }
 
     /**
      * Ratio is compressed/uncompressed and it is
      * if you have 1.x then compression isn't helping
      */
-    public void addCompressionRatio(long compressed, long uncompressed)
+    public MetadataCollector addCompressionRatio(long compressed, long uncompressed)
     {
         compressionRatio = (double) compressed/uncompressed;
+        return this;
     }
 
-    public void updateMinTimestamp(long potentialMin)
+    public MetadataCollector updateMinTimestamp(long potentialMin)
     {
         minTimestamp = Math.min(minTimestamp, potentialMin);
+        return this;
     }
 
-    public void updateMaxTimestamp(long potentialMax)
+    public MetadataCollector updateMaxTimestamp(long potentialMax)
     {
         maxTimestamp = Math.max(maxTimestamp, potentialMax);
+        return this;
     }
 
-    public void updateMaxLocalDeletionTime(int maxLocalDeletionTime)
+    public MetadataCollector updateMaxLocalDeletionTime(int maxLocalDeletionTime)
     {
         this.maxLocalDeletionTime = Math.max(this.maxLocalDeletionTime, maxLocalDeletionTime);
+        return this;
     }
 
     public MetadataCollector estimatedRowSize(EstimatedHistogram estimatedRowSize)
@@ -184,18 +193,6 @@ public class MetadataCollector
         return this;
     }
 
-    public void update(long size, ColumnStats stats)
-    {
-        updateMinTimestamp(stats.minTimestamp);
-        updateMaxTimestamp(stats.maxTimestamp);
-        updateMaxLocalDeletionTime(stats.maxLocalDeletionTime);
-        addRowSize(size);
-        addColumnCount(stats.columnCount);
-        mergeTombstoneHistogram(stats.tombstoneHistogram);
-        updateMinColumnNames(stats.minColumnNames);
-        updateMaxColumnNames(stats.maxColumnNames);
-    }
-
     public MetadataCollector sstableLevel(int sstableLevel)
     {
         this.sstableLevel = sstableLevel;
@@ -216,6 +213,26 @@ public class MetadataCollector
         return this;
     }
 
+    public MetadataCollector updateHasLegacyCounterShards(boolean hasLegacyCounterShards)
+    {
+        this.hasLegacyCounterShards = this.hasLegacyCounterShards || hasLegacyCounterShards;
+        return this;
+    }
+
+    public MetadataCollector update(long rowSize, ColumnStats stats)
+    {
+        updateMinTimestamp(stats.minTimestamp);
+        updateMaxTimestamp(stats.maxTimestamp);
+        updateMaxLocalDeletionTime(stats.maxLocalDeletionTime);
+        addRowSize(rowSize);
+        addColumnCount(stats.columnCount);
+        mergeTombstoneHistogram(stats.tombstoneHistogram);
+        updateMinColumnNames(stats.minColumnNames);
+        updateMaxColumnNames(stats.maxColumnNames);
+        updateHasLegacyCounterShards(stats.hasLegacyCounterShards);
+        return this;
+    }
+
     public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner,
double bloomFilterFPChance, long repairedAt)
     {
         Map<MetadataType, MetadataComponent> components = Maps.newHashMap();
@@ -231,9 +248,9 @@ public class MetadataCollector
                                                              sstableLevel,
                                                              minColumnNames,
                                                              maxColumnNames,
+                                                             hasLegacyCounterShards,
                                                              repairedAt));
         components.put(MetadataType.COMPACTION, new CompactionMetadata(ancestors, cardinality));
         return components;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index 1c3dfd5..900bd4e 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -52,6 +52,7 @@ public class StatsMetadata extends MetadataComponent
     public final int sstableLevel;
     public final List<ByteBuffer> maxColumnNames;
     public final List<ByteBuffer> minColumnNames;
+    public final boolean hasLegacyCounterShards;
     public final long repairedAt;
 
     public StatsMetadata(EstimatedHistogram estimatedRowSize,
@@ -65,6 +66,7 @@ public class StatsMetadata extends MetadataComponent
                          int sstableLevel,
                          List<ByteBuffer> minColumnNames,
                          List<ByteBuffer> maxColumnNames,
+                         boolean hasLegacyCounterShards,
                          long repairedAt)
     {
         this.estimatedRowSize = estimatedRowSize;
@@ -78,6 +80,7 @@ public class StatsMetadata extends MetadataComponent
         this.sstableLevel = sstableLevel;
         this.minColumnNames = minColumnNames;
         this.maxColumnNames = maxColumnNames;
+        this.hasLegacyCounterShards = hasLegacyCounterShards;
         this.repairedAt = repairedAt;
     }
 
@@ -123,6 +126,7 @@ public class StatsMetadata extends MetadataComponent
                                  newLevel,
                                  maxColumnNames,
                                  minColumnNames,
+                                 hasLegacyCounterShards,
                                  repairedAt);
     }
 
@@ -139,6 +143,7 @@ public class StatsMetadata extends MetadataComponent
                                  sstableLevel,
                                  maxColumnNames,
                                  minColumnNames,
+                                 hasLegacyCounterShards,
                                  newRepairedAt);
     }
 
@@ -162,6 +167,7 @@ public class StatsMetadata extends MetadataComponent
                        .append(repairedAt, that.repairedAt)
                        .append(maxColumnNames, that.maxColumnNames)
                        .append(minColumnNames, that.minColumnNames)
+                       .append(hasLegacyCounterShards, that.hasLegacyCounterShards)
                        .build();
     }
 
@@ -181,6 +187,7 @@ public class StatsMetadata extends MetadataComponent
                        .append(repairedAt)
                        .append(maxColumnNames)
                        .append(minColumnNames)
+                       .append(hasLegacyCounterShards)
                        .build();
     }
 
@@ -203,6 +210,7 @@ public class StatsMetadata extends MetadataComponent
             size += 4;
             for (ByteBuffer columnName : component.maxColumnNames)
                 size += 2 + columnName.remaining(); // with short length
+            size += TypeSizes.NATIVE.sizeof(component.hasLegacyCounterShards);
             return size;
         }
 
@@ -224,6 +232,7 @@ public class StatsMetadata extends MetadataComponent
             out.writeInt(component.maxColumnNames.size());
             for (ByteBuffer columnName : component.maxColumnNames)
                 ByteBufferUtil.writeWithShortLength(columnName, out);
+            out.writeBoolean(component.hasLegacyCounterShards);
         }
 
         public StatsMetadata deserialize(Descriptor.Version version, DataInput in) throws
IOException
@@ -251,6 +260,10 @@ public class StatsMetadata extends MetadataComponent
             for (int i = 0; i < colCount; i++)
                 maxColumnNames.add(ByteBufferUtil.readWithShortLength(in));
 
+            boolean hasLegacyCounterShards = true;
+            if (version.tracksLegacyCounterShards)
+                hasLegacyCounterShards = in.readBoolean();
+
             return new StatsMetadata(rowSizes,
                                      columnCounts,
                                      replayPosition,
@@ -262,6 +275,7 @@ public class StatsMetadata extends MetadataComponent
                                      sstableLevel,
                                      minColumnNames,
                                      maxColumnNames,
+                                     hasLegacyCounterShards,
                                      repairedAt);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/57b18e60/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
index 78f248b..1624a6b 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
@@ -1,4 +1,3 @@
-package org.apache.cassandra.io.sstable;
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,7 +18,7 @@ package org.apache.cassandra.io.sstable;
  * under the License.
  * 
  */
-
+package org.apache.cassandra.io.sstable;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
@@ -32,11 +31,16 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CounterId;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import static org.apache.cassandra.Util.cellname;
+
 public class SSTableMetadataTest extends SchemaLoader
 {
     @Test
@@ -50,14 +54,14 @@ public class SSTableMetadataTest extends SchemaLoader
             DecoratedKey key = Util.dk(Integer.toString(i));
             Mutation rm = new Mutation("Keyspace1", key.key);
             for (int j = 0; j < 10; j++)
-                rm.add("Standard1", Util.cellname(Integer.toString(j)),
+                rm.add("Standard1", cellname(Integer.toString(j)),
                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
                        timestamp,
                        10 + j);
             rm.apply();
         }
         Mutation rm = new Mutation("Keyspace1", Util.dk("longttl").key);
-        rm.add("Standard1", Util.cellname("col"),
+        rm.add("Standard1", cellname("col"),
                ByteBufferUtil.EMPTY_BYTE_BUFFER,
                timestamp,
                10000);
@@ -73,7 +77,7 @@ public class SSTableMetadataTest extends SchemaLoader
 
         }
         rm = new Mutation("Keyspace1", Util.dk("longttl2").key);
-        rm.add("Standard1", Util.cellname("col"),
+        rm.add("Standard1", cellname("col"),
                ByteBufferUtil.EMPTY_BYTE_BUFFER,
                timestamp,
                20000);
@@ -81,7 +85,7 @@ public class SSTableMetadataTest extends SchemaLoader
         ttltimestamp = (int) (System.currentTimeMillis()/1000);
         store.forceBlockingFlush();
         assertEquals(2, store.getSSTables().size());
-        List<SSTableReader> sstables = new ArrayList<SSTableReader>(store.getSSTables());
+        List<SSTableReader> sstables = new ArrayList<>(store.getSSTables());
         if(sstables.get(0).getSSTableMetadata().maxLocalDeletionTime < sstables.get(1).getSSTableMetadata().maxLocalDeletionTime)
         {
             assertEquals(sstables.get(0).getSSTableMetadata().maxLocalDeletionTime, firstDelTime);
@@ -121,11 +125,11 @@ public class SSTableMetadataTest extends SchemaLoader
         DecoratedKey key = Util.dk("deletetest");
         Mutation rm = new Mutation("Keyspace1", key.key);
         for (int i = 0; i<5; i++)
-            rm.add("Standard2", Util.cellname("deletecolumn"+i),
+            rm.add("Standard2", cellname("deletecolumn" + i),
                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
                        timestamp,
                        100);
-        rm.add("Standard2", Util.cellname("todelete"),
+        rm.add("Standard2", cellname("todelete"),
                    ByteBufferUtil.EMPTY_BYTE_BUFFER,
                    timestamp,
                    1000);
@@ -140,7 +144,7 @@ public class SSTableMetadataTest extends SchemaLoader
             assertEquals(ttltimestamp + 1000, firstMaxDelTime, 10);
         }
         rm = new Mutation("Keyspace1", key.key);
-        rm.delete("Standard2", Util.cellname("todelete"), timestamp + 1);
+        rm.delete("Standard2", cellname("todelete"), timestamp + 1);
         rm.apply();
         store.forceBlockingFlush();
         assertEquals(2,store.getSSTables().size());
@@ -174,7 +178,7 @@ public class SSTableMetadataTest extends SchemaLoader
             Mutation rm = new Mutation("Keyspace1", key.key);
             for (int i = 100; i<150; i++)
             {
-                rm.add("Standard3", Util.cellname(j+"col"+i), ByteBufferUtil.EMPTY_BYTE_BUFFER,
System.currentTimeMillis());
+                rm.add("Standard3", cellname(j + "col" + i), ByteBufferUtil.EMPTY_BYTE_BUFFER,
System.currentTimeMillis());
             }
             rm.apply();
         }
@@ -189,7 +193,7 @@ public class SSTableMetadataTest extends SchemaLoader
         Mutation rm = new Mutation("Keyspace1", key.key);
         for (int i = 101; i<299; i++)
         {
-            rm.add("Standard3", Util.cellname(9+"col"+i), ByteBufferUtil.EMPTY_BYTE_BUFFER,
System.currentTimeMillis());
+            rm.add("Standard3", cellname(9 + "col" + i), ByteBufferUtil.EMPTY_BYTE_BUFFER,
System.currentTimeMillis());
         }
         rm.apply();
 
@@ -202,6 +206,7 @@ public class SSTableMetadataTest extends SchemaLoader
             assertEquals(ByteBufferUtil.string(sstable.getSSTableMetadata().maxColumnNames.get(0)),
"9col298");
         }
     }
+
     @Test
     public void testMaxMinComposites() throws CharacterCodingException, ExecutionException,
InterruptedException
     {
@@ -251,4 +256,54 @@ public class SSTableMetadataTest extends SchemaLoader
             assertEquals(0, ByteBufferUtil.toInt(sstable.getSSTableMetadata().minColumnNames.get(1)));
         }
     }
+
+    @Test
+    public void testLegacyCounterShardTracking()
+    {
+        ColumnFamilyStore cfs = Keyspace.open("Keyspace1").getColumnFamilyStore("Counter1");
+
+        // A cell with all shards
+        CounterContext.ContextState state = CounterContext.ContextState.allocate(1, 1, 1);
+        state.writeGlobal(CounterId.fromInt(1), 1L, 1L);
+        state.writeLocal(CounterId.fromInt(2), 1L, 1L);
+        state.writeRemote(CounterId.fromInt(3), 1L, 1L);
+        ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+        cells.addColumn(new CounterCell(cellname("col"), state.context, 1L, Long.MIN_VALUE));
+        new Mutation(Util.dk("k").key, cells).apply();
+        cfs.forceBlockingFlush();
+        assertTrue(cfs.getSSTables().iterator().next().getSSTableMetadata().hasLegacyCounterShards);
+        cfs.truncateBlocking();
+
+        // A cell with global and remote shards
+        state = CounterContext.ContextState.allocate(0, 1, 1);
+        state.writeLocal(CounterId.fromInt(2), 1L, 1L);
+        state.writeRemote(CounterId.fromInt(3), 1L, 1L);
+        cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+        cells.addColumn(new CounterCell(cellname("col"), state.context, 1L, Long.MIN_VALUE));
+        new Mutation(Util.dk("k").key, cells).apply();
+        cfs.forceBlockingFlush();
+        assertTrue(cfs.getSSTables().iterator().next().getSSTableMetadata().hasLegacyCounterShards);
+        cfs.truncateBlocking();
+
+        // A cell with global and local shards
+        state = CounterContext.ContextState.allocate(1, 1, 0);
+        state.writeGlobal(CounterId.fromInt(1), 1L, 1L);
+        state.writeLocal(CounterId.fromInt(2), 1L, 1L);
+        cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+        cells.addColumn(new CounterCell(cellname("col"), state.context, 1L, Long.MIN_VALUE));
+        new Mutation(Util.dk("k").key, cells).apply();
+        cfs.forceBlockingFlush();
+        assertTrue(cfs.getSSTables().iterator().next().getSSTableMetadata().hasLegacyCounterShards);
+        cfs.truncateBlocking();
+
+        // A cell with global only
+        state = CounterContext.ContextState.allocate(1, 0, 0);
+        state.writeGlobal(CounterId.fromInt(1), 1L, 1L);
+        cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+        cells.addColumn(new CounterCell(cellname("col"), state.context, 1L, Long.MIN_VALUE));
+        new Mutation(Util.dk("k").key, cells).apply();
+        cfs.forceBlockingFlush();
+        assertFalse(cfs.getSSTables().iterator().next().getSSTableMetadata().hasLegacyCounterShards);
+        cfs.truncateBlocking();
+    }
 }


Mime
View raw message