cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject svn commit: r1198725 - in /cassandra/branches/cassandra-0.8: ./ src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cassandra/db/context/ src/java/org/apache/cassandra...
Date Mon, 07 Nov 2011 13:26:00 GMT
Author: slebresne
Date: Mon Nov  7 13:26:00 2011
New Revision: 1198725

URL: http://svn.apache.org/viewvc?rev=1198725&view=rev
Log:
patch by slebresne; reviewed by yukim for CASSANDRA-3178

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/CFMetaData.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterColumn.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutation.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Memtable.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/CounterMutationTest.java
    cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/context/CounterContextTest.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Mon Nov  7 13:26:00 2011
@@ -38,6 +38,7 @@
  * acquire compactionlock during truncate (CASSANDRA-3399)
  * fix displaying cfdef entries for super columnfamilies (CASSANDRA-3415)
  * (Hadoop) Fix empty row filtering (CASSANDRA-3450)
+ * Make counter shard merging thread safe (CASSANDRA-3178)
 
 0.8.7
  * Kill server on wrapped OOME such as from FileChannel.map (CASSANDRA-3201)

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/CFMetaData.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/CFMetaData.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/CFMetaData.java
(original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/config/CFMetaData.java
Mon Nov  7 13:26:00 2011
@@ -150,6 +150,8 @@ public final class CFMetaData
     private int memtableFlushAfterMins;               // default 60 
     private int memtableThroughputInMb;               // default based on heap size
     private double memtableOperationsInMillions;      // default based on throughput
+    // mergeShardsChance is now obsolete, but left here so as to not break
+    // thrift compatibility
     private double mergeShardsChance;                 // default 0.1, chance [0.0, 1.0] of
merging old shards during replication
     private IRowCacheProvider rowCacheProvider;
     private ByteBuffer keyAlias;                      // default NULL

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Mon Nov  7 13:26:00 2011
@@ -2308,4 +2308,16 @@ public class ColumnFamilyStore implement
 
         return reader;
     }
+
+    /**
+     * Returns the creation time of the oldest memtable not fully flushed yet.
+     */
+    public long oldestUnflushedMemtable()
+    {
+        DataTracker.View view = data.getView();
+        long oldest = view.memtable.creationTime();
+        for (Memtable memtable : view.memtablesPendingFlush)
+            oldest = Math.min(oldest, memtable.creationTime());
+        return oldest;
+    }
 }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterColumn.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterColumn.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterColumn.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterColumn.java Mon
Nov  7 13:26:00 2011
@@ -22,17 +22,24 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
+import java.util.concurrent.TimeoutException;
 import java.util.Arrays;
 import java.util.Map;
 
+import com.google.common.collect.Multimap;
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.context.IContext.ContextRelationship;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.service.IWriteResponseHandler;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.NodeId;
@@ -223,9 +230,9 @@ public class CounterColumn extends Colum
         return contextManager.hasNodeId(value(), id);
     }
 
-    public CounterColumn computeOldShardMerger()
+    private CounterColumn computeOldShardMerger(int mergeBefore)
     {
-        ByteBuffer bb = contextManager.computeOldShardMerger(value(), NodeId.getOldLocalNodeIds());
+        ByteBuffer bb = contextManager.computeOldShardMerger(value(), NodeId.getOldLocalNodeIds(),
mergeBefore);
         if (bb == null)
             return null;
         else
@@ -243,8 +250,19 @@ public class CounterColumn extends Colum
         }
     }
 
-    public static void removeOldShards(ColumnFamily cf, int gcBefore)
+    /**
+     * There is two phase to the removal of old shards.
+     * First phase: we merge the old shard value to the current shard and
+     * 'nulify' the old one. We then send the counter context with the old
+     * shard nulified to all other replica.
+     * Second phase: once an old shard has been nulified for longer than
+     * gc_grace (to be sure all other replica had been aware of the merge), we
+     * simply remove that old shard from the context (it's value is 0).
+     * This method does both phases.
+     */
+    public static void mergeAndRemoveOldShards(DecoratedKey key, ColumnFamily cf, int gcBefore,
int mergeBefore)
     {
+        ColumnFamily remoteMerger = null;
         if (!cf.isSuper())
         {
             for (Map.Entry<ByteBuffer, IColumn> entry : cf.getColumnsMap().entrySet())
@@ -253,8 +271,18 @@ public class CounterColumn extends Colum
                 IColumn c = entry.getValue();
                 if (!(c instanceof CounterColumn))
                     continue;
-                CounterColumn cleaned = ((CounterColumn) c).removeOldShards(gcBefore);
-                if (cleaned != c)
+                CounterColumn cc = (CounterColumn) c;
+                CounterColumn shardMerger = cc.computeOldShardMerger(mergeBefore);
+                CounterColumn merged = cc;
+                if (shardMerger != null)
+                {
+                    merged = (CounterColumn) cc.reconcile(shardMerger);
+                    if (remoteMerger == null)
+                        remoteMerger = cf.cloneMeShallow();
+                    remoteMerger.addColumn(merged);
+                }
+                CounterColumn cleaned = merged.removeOldShards(gcBefore);
+                if (cleaned != cc)
                 {
                     cf.remove(cname);
                     cf.addColumn(cleaned);
@@ -270,7 +298,17 @@ public class CounterColumn extends Colum
                 {
                     if (!(subColumn instanceof CounterColumn))
                         continue;
-                    CounterColumn cleaned = ((CounterColumn) subColumn).removeOldShards(gcBefore);
+                    CounterColumn cc = (CounterColumn) subColumn;
+                    CounterColumn shardMerger = cc.computeOldShardMerger(mergeBefore);
+                    CounterColumn merged = cc;
+                    if (shardMerger != null)
+                    {
+                        merged = (CounterColumn) cc.reconcile(shardMerger);
+                        if (remoteMerger == null)
+                            remoteMerger = cf.cloneMeShallow();
+                        remoteMerger.addColumn(c.name(), merged);
+                    }
+                    CounterColumn cleaned = merged.removeOldShards(gcBefore);
                     if (cleaned != subColumn)
                     {
                         c.remove(subColumn.name());
@@ -279,6 +317,40 @@ public class CounterColumn extends Colum
                 }
             }
         }
+
+        if (remoteMerger != null)
+        {
+            try
+            {
+                sendToOtherReplica(key, remoteMerger);
+            }
+            catch (Exception e)
+            {
+                logger.error("Error while sending shard merger mutation to remote endpoints",
e);
+            }
+        }
     }
 
+    private static void sendToOtherReplica(DecoratedKey key, ColumnFamily cf) throws UnavailableException,
TimeoutException, IOException
+    {
+        RowMutation rm = new RowMutation(cf.metadata().ksName, key.key);
+        rm.add(cf);
+
+        final InetAddress local = FBUtilities.getLocalAddress();
+        String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(local);
+
+        StorageProxy.performWrite(rm, ConsistencyLevel.ANY, localDataCenter, new StorageProxy.WritePerformer()
+        {
+            public void apply(IMutation mutation, Multimap<InetAddress, InetAddress>
hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel
consistency_level) throws IOException
+            {
+                // We should only send to the remote replica, not the local one
+                hintedEndpoints.remove(local, local);
+                // Fake local response to be a good lad but we won't wait on the responseHandler
+                responseHandler.response(null);
+                StorageProxy.sendToHintedEndpoints((RowMutation) mutation, hintedEndpoints,
responseHandler, localDataCenter, consistency_level);
+            }
+        });
+
+        // we don't wait for answers
+    }
 }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutation.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutation.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutation.java
(original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/CounterMutation.java
Mon Nov  7 13:26:00 2011
@@ -106,7 +106,6 @@ public class CounterMutation implements 
             if (row == null || row.cf == null)
                 continue;
 
-            row = mergeOldShards(readCommand.table, row);
             ColumnFamily cf = row.cf;
             if (cf.isSuper())
                 cf.retainAll(rowMutation.getColumnFamily(cf.metadata().cfId));
@@ -121,73 +120,6 @@ public class CounterMutation implements 
         commands.add(new SliceByNamesReadCommand(table, key, queryPath, columnFamily.getColumnNames()));
     }
 
-    private Row mergeOldShards(String table, Row row) throws IOException
-    {
-        ColumnFamily cf = row.cf;
-        // random check for merging to allow lessening the performance impact
-        if (cf.metadata().getMergeShardsChance() > FBUtilities.threadLocalRandom().nextDouble())
-        {
-            ColumnFamily merger = computeShardMerger(cf);
-            if (merger != null)
-            {
-                RowMutation localMutation = new RowMutation(table, row.key.key);
-                localMutation.add(merger);
-                localMutation.apply();
-
-                cf.addAll(merger);
-            }
-        }
-        return row;
-    }
-
-    private ColumnFamily computeShardMerger(ColumnFamily cf)
-    {
-        ColumnFamily merger = null;
-
-        // CF type: regular
-        if (!cf.isSuper())
-        {
-            for (IColumn column : cf.getSortedColumns())
-            {
-                if (!(column instanceof CounterColumn))
-                    continue;
-                IColumn c = ((CounterColumn)column).computeOldShardMerger();
-                if (c != null)
-                {
-                    if (merger == null)
-                        merger = cf.cloneMeShallow();
-                    merger.addColumn(c);
-                }
-            }
-        }
-        else // CF type: super
-        {
-            for (IColumn superColumn : cf.getSortedColumns())
-            {
-                IColumn mergerSuper = null;
-                for (IColumn column : superColumn.getSubColumns())
-                {
-                    if (!(column instanceof CounterColumn))
-                        continue;
-                    IColumn c = ((CounterColumn)column).computeOldShardMerger();
-                    if (c != null)
-                    {
-                        if (mergerSuper == null)
-                            mergerSuper = ((SuperColumn)superColumn).cloneMeShallow();
-                        mergerSuper.addColumn(c);
-                    }
-                }
-                if (mergerSuper != null)
-                {
-                    if (merger == null)
-                        merger = cf.cloneMeShallow();
-                    merger.addColumn(mergerSuper);
-                }
-            }
-        }
-        return merger;
-    }
-
     public Message makeMutationMessage(int version) throws IOException
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Memtable.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Memtable.java Mon Nov
 7 13:26:00 2011
@@ -399,4 +399,9 @@ public class Memtable implements Compara
     {
         return System.currentTimeMillis() > creationTime + cfs.getMemtableFlushAfterMins()
* 60 * 1000L;
     }
+
+    public long creationTime()
+    {
+        return creationTime;
+    }
 }

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java
(original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionController.java
Mon Nov  7 13:26:00 2011
@@ -46,6 +46,7 @@ public class CompactionController
 
     public final boolean isMajor;
     public final int gcBefore;
+    public final int mergeShardBefore;
     private int throttleResolution;
 
     public CompactionController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables,
int gcBefore, boolean forceDeserialize)
@@ -54,6 +55,11 @@ public class CompactionController
         this.cfs = cfs;
         this.sstables = new HashSet<SSTableReader>(sstables);
         this.gcBefore = gcBefore;
+        // If we merge an old NodeId id, we must make sure that no further increment for
that id are in an active memtable.
+        // For that, we must make sure that this id was renewed before the creation of the
oldest unflushed memtable. We
+        // add 5 minutes to be sure we're on the safe side in terms of thread safety (though
we should be fine in our
+        // current 'stop all write during memtable switch' situation).
+        this.mergeShardBefore = (int) ((cfs.oldestUnflushedMemtable() + 5 * 3600) / 1000);
         this.forceDeserialize = forceDeserialize;
         isMajor = cfs.isCompleteSSTables(this.sstables);
         // how many rows we expect to compact in 100ms

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
(original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
Mon Nov  7 13:26:00 2011
@@ -202,7 +202,7 @@ public class LazilyCompactedRow extends 
 
         protected IColumn getReduced()
         {
-            ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(shouldPurge,
controller, container);
+            ColumnFamily purged = PrecompactedRow.removeDeletedAndOldShards(key, shouldPurge,
controller, container);
             if (purged == null || !purged.iterator().hasNext())
             {
                 container.clear();

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
(original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
Mon Nov  7 13:26:00 2011
@@ -57,14 +57,14 @@ public class PrecompactedRow extends Abs
 
     public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, CompactionController
controller, ColumnFamily cf)
     {
-        return removeDeletedAndOldShards(controller.shouldPurge(key), controller, cf);
+        return removeDeletedAndOldShards(key, controller.shouldPurge(key), controller, cf);
     }
 
-    public static ColumnFamily removeDeletedAndOldShards(boolean shouldPurge, CompactionController
controller, ColumnFamily cf)
+    public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge,
CompactionController controller, ColumnFamily cf)
     {
         ColumnFamily compacted = shouldPurge ? ColumnFamilyStore.removeDeleted(cf, controller.gcBefore)
: cf;
         if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
-            CounterColumn.removeOldShards(compacted, controller.gcBefore);
+            CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore);
         return compacted;
     }
 

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java
(original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/context/CounterContext.java
Mon Nov  7 13:26:00 2011
@@ -69,10 +69,6 @@ public class CounterContext implements I
 
     private static final Logger logger = Logger.getLogger(CounterContext.class);
 
-    // Time in ms since a node id has been renewed before we consider using it
-    // during a merge
-    private static final long MIN_MERGE_DELAY = 5 * 60 * 1000; // should be aplenty
-
     // lazy-load singleton
     private static class LazyHolder
     {
@@ -502,84 +498,123 @@ public class CounterContext implements I
 
     /**
      * Compute a new context such that if applied to context yields the same
-     * total but with the older local node id merged into the second to older one
-     * (excluding current local node id) if need be.
+     * total but with old local node ids nulified and there content merged to
+     * the current localNodeId.
      */
-    public ByteBuffer computeOldShardMerger(ByteBuffer context, List<NodeId.NodeIdRecord>
oldIds)
+    public ByteBuffer computeOldShardMerger(ByteBuffer context, List<NodeId.NodeIdRecord>
oldIds, long mergeBefore)
     {
         long now = System.currentTimeMillis();
         int hlength = headerLength(context);
-
-        // Don't bother if we know we can't find what we are looking for
-        if (oldIds.size() < 2
-         || now - oldIds.get(0).timestamp < MIN_MERGE_DELAY
-         || now - oldIds.get(1).timestamp < MIN_MERGE_DELAY
-         || context.remaining() - hlength < 2 * STEP_LENGTH)
-            return null;
+        NodeId localId = NodeId.getLocalId();
 
         Iterator<NodeId.NodeIdRecord> recordIterator = oldIds.iterator();
-        NodeId.NodeIdRecord currRecord = recordIterator.next();
+        NodeId.NodeIdRecord currRecord = recordIterator.hasNext() ? recordIterator.next()
: null;
 
         ContextState state = new ContextState(context, hlength);
         ContextState foundState = null;
 
+        List<NodeId> toMerge = new ArrayList<NodeId>();
+        long mergeTotal = 0;
         while (state.hasRemaining() && currRecord != null)
         {
-            if (now - currRecord.timestamp < MIN_MERGE_DELAY)
-                return context;
+            assert !currRecord.id.equals(localId);
 
-            assert !currRecord.id.equals(NodeId.getLocalId());
+            NodeId nodeId = state.getNodeId();
+            int c = nodeId.compareTo(currRecord.id);
 
-            int c = state.getNodeId().compareTo(currRecord.id);
-            if (c == 0)
+            if (c > 0)
+            {
+                currRecord = recordIterator.hasNext() ? recordIterator.next() : null;
+                continue;
+            }
+
+            if (state.isDelta())
             {
-                if (foundState == null)
+                if (state.getClock() < 0)
                 {
-                    // We found a canditate for being merged
-                    if (state.getClock() < 0)
-                        return null;
-
-                    foundState = state.duplicate();
-                    currRecord = recordIterator.hasNext() ? recordIterator.next() : null;
-                    state.moveToNext();
+                    // Already merged shard, waiting to be collected
+
+                    if (nodeId.equals(localId))
+                        // we should not get there, but we have been creating problematic
context prior to #2968
+                        throw new RuntimeException("Current nodeId with a negative clock
(likely due to #2968). You need to restart this node with -Dcassandra.renew_counter_id=true
to fix.");
+
+                    if (state.getCount() != 0)
+                    {
+                        // This should not happen, but previous bugs have generated this
(#2968 in particular) so fixing it.
+                        logger.error(String.format("Invalid counter context (clock is %d
and count is %d for NodeId %s), will fix", state.getCount(), state.getCount(), nodeId.toString()));
+                        toMerge.add(nodeId);
+                        mergeTotal += state.getCount();
+                    }
                 }
-                else
+                else if (c == 0)
                 {
-                    assert !foundState.getNodeId().equals(state.getNodeId());
-
-                    // Found someone to merge it to
-                    int nbDelta = foundState.isDelta() ? 1 : 0;
-                    nbDelta += state.isDelta() ? 1 : 0;
-                    ContextState merger = ContextState.allocate(2, nbDelta);
-
-                    long fclock = foundState.getClock();
-                    long fcount = foundState.getCount();
-                    long clock = state.getClock();
-                    long count = state.getCount();
+                    // Found an old id. However, merging an oldId that has just been renewed
isn't safe, so
+                    // we check that it has been renewed before mergeBefore.
+                    if (currRecord.timestamp < mergeBefore)
+                    {
+                        toMerge.add(nodeId);
+                        mergeTotal += state.getCount();
+                    }
+                }
+            }
 
-                    if (foundState.isDelta())
-                        merger.writeElement(foundState.getNodeId(), -now - fclock, -fcount,
true);
-                    else
-                        merger.writeElement(foundState.getNodeId(), -now, 0);
+            if (c == 0)
+                currRecord = recordIterator.hasNext() ? recordIterator.next() : null;
 
-                    if (state.isDelta())
-                        merger.writeElement(state.getNodeId(), fclock + clock, fcount, true);
-                    else
-                        merger.writeElement(state.getNodeId(), fclock + clock, fcount + count);
+            state.moveToNext();
+        }
+        // Continuing the iteration so that we can repair invalid shards
+        while (state.hasRemaining())
+        {
+            NodeId nodeId = state.getNodeId();
+            if (state.isDelta() && state.getClock() < 0)
+            {
+                if (nodeId.equals(localId))
+                    // we should not get there, but we have been creating problematic context
prior to #2968
+                    throw new RuntimeException("Current nodeId with a negative clock (likely
due to #2968). You need to restart this node with -Dcassandra.renew_counter_id=true to fix.");
 
-                    return merger.context;
+                if (state.getCount() != 0)
+                {
+                    // This should not happen, but previous bugs have generated this (#2968
in particular) so fixing it.
+                    logger.error(String.format("Invalid counter context (clock is %d and
count is %d for NodeId %s), will fix", state.getClock(), state.getCount(), nodeId.toString()));
+                    toMerge.add(nodeId);
+                    mergeTotal += state.getCount();
                 }
             }
-            else if (c < 0) // nodeid < record
+            state.moveToNext();
+        }
+
+        if (toMerge.isEmpty())
+            return null;
+
+        ContextState merger = ContextState.allocate(toMerge.size() + 1, toMerge.size() +
1);
+        state.reset();
+        int i = 0;
+        int removedTotal = 0;
+        boolean localWritten = false;
+        while (state.hasRemaining())
+        {
+            NodeId nodeId = state.getNodeId();
+            if (nodeId.compareTo(localId) > 0)
             {
-                state.moveToNext();
+                merger.writeElement(localId, 1L, mergeTotal, true);
+                localWritten = true;
             }
-            else // c > 0, nodeid > record
+            else if (i < toMerge.size() && nodeId.compareTo(toMerge.get(i)) ==
0)
             {
-                currRecord = recordIterator.hasNext() ? recordIterator.next() : null;
+                long count = state.getCount();
+                removedTotal += count;
+                merger.writeElement(nodeId, -now - state.getClock(), -count, true);
+                ++i;
             }
+            state.moveToNext();
         }
-        return null;
+        if (!localWritten)
+            merger.writeElement(localId, 1L, mergeTotal, true);
+
+        // sanity check
+        assert mergeTotal == removedTotal;
+        return merger.context;
     }
 
     /**
@@ -592,59 +627,61 @@ public class CounterContext implements I
     {
         int hlength = headerLength(context);
         ContextState state = new ContextState(context, hlength);
-        int removedBodySize = 0, removedHeaderSize = 0;
-        boolean forceFixing = false;
+        int removedShards = 0;
         while (state.hasRemaining())
         {
             long clock = state.getClock();
-            if (clock < 0 && -((int)(clock / 1000)) < gcBefore && (state.getCount()
== 0 || !state.isDelta()))
-            {
-                removedBodySize += STEP_LENGTH;
-                if (state.isDelta())
-                    removedHeaderSize += HEADER_ELT_LENGTH;
-            }
-            else if (clock < 0 && state.getCount() != 0 && state.isDelta())
+            if (clock < 0)
             {
-                forceFixing = true;
+                // We should never have a count != 0 when clock < 0.
+                // We know that previous may have created those situation though, so:
+                //   * for delta shard: we throw an exception since computeOldShardMerger
should
+                //     have corrected that situation
+                //   * for non-delta shard: it is a much more crappier situation because
there is
+                //     not much we can do since we are not responsible for that shard. So
we simply
+                //     ignore the shard.
+                if (state.getCount() != 0)
+                {
+                    if (state.isDelta())
+                    {
+                        throw new IllegalStateException("Counter shard with negative clock
but count != 0; context = " + toString(context));
+                    }
+                    else
+                    {
+                        logger.debug("Ignoring non-removable non-delta corrupted shard in
context " + toString(context));
+                        state.moveToNext();
+                        continue;
+                    }
+                }
+
+                if (-((int)(clock / 1000)) < gcBefore)
+                    removedShards++;
             }
             state.moveToNext();
         }
 
-        if (removedBodySize == 0 && !forceFixing)
+        if (removedShards == 0)
             return context;
 
-        int newSize = context.remaining() - removedHeaderSize - removedBodySize;
+
+        int removedHeaderSize = removedShards * HEADER_ELT_LENGTH;
+        int newSize = context.remaining() - removedHeaderSize - (removedShards * STEP_LENGTH);
         int newHlength = hlength - removedHeaderSize;
         ByteBuffer cleanedContext = ByteBuffer.allocate(newSize);
         cleanedContext.putShort(cleanedContext.position(), (short) ((newHlength - HEADER_SIZE_LENGTH)
/ HEADER_ELT_LENGTH));
         ContextState cleaned = new ContextState(cleanedContext, newHlength);
 
         state.reset();
-        long toAddBack = 0;
         while (state.hasRemaining())
         {
             long clock = state.getClock();
-            if (!(clock < 0 && -((int)(clock / 1000)) < gcBefore &&
(state.getCount() == 0 || !state.isDelta())))
+            if (!(clock < 0 && state.getCount() == 0))
             {
-                if (clock < 0 && state.getCount() != 0 && state.isDelta())
-                {
-                    // we should not get there, but we have been creating problematic context
prior to #2968
-                    if (state.getNodeId().equals(NodeId.getLocalId()))
-                        throw new RuntimeException("Merged counter shard with a count !=
0 (likely due to #2968). You need to restart this node with -Dcassandra.renew_counter_id=true
to fix.");
-
-                    // we will "fix" it, but log a message
-                    logger.info("Collectable old shard with a count != 0. Will fix.");
-                    cleaned.writeElement(state.getNodeId(), clock - 1L, 0, true);
-                    toAddBack += state.getCount();
-                }
-                else
-                {
-                    state.copyTo(cleaned);
-                }
+                state.copyTo(cleaned);
             }
             state.moveToNext();
         }
-        return toAddBack == 0 ? cleanedContext : merge(cleanedContext, create(toAddBack));
+        return cleanedContext;
     }
 
     /**

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageProxy.java
Mon Nov  7 13:26:00 2011
@@ -227,7 +227,7 @@ public class StorageProxy implements Sto
         return ss.getTokenMetadata().getWriteEndpoints(StorageService.getPartitioner().getToken(key),
table, naturalEndpoints);
     }
 
-    private static void sendToHintedEndpoints(final RowMutation rm, Multimap<InetAddress,
InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter,
ConsistencyLevel consistency_level)
+    public static void sendToHintedEndpoints(final RowMutation rm, Multimap<InetAddress,
InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter,
ConsistencyLevel consistency_level)
     throws IOException
     {
         // Multimap that holds onto all the messages and addresses meant for a specific datacenter
@@ -1105,7 +1105,7 @@ public class StorageProxy implements Sto
         return !Gossiper.instance.getUnreachableMembers().isEmpty();
     }
 
-    private interface WritePerformer
+    public interface WritePerformer
     {
         public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints,
IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level)
throws IOException;
     }

Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/CounterMutationTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/CounterMutationTest.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/CounterMutationTest.java
(original)
+++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/CounterMutationTest.java
Mon Nov  7 13:26:00 2011
@@ -19,6 +19,7 @@ package org.apache.cassandra.db;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.List;
 
 import org.junit.Test;
@@ -87,6 +88,7 @@ public class CounterMutationTest extends
     public void testRemoveOldShardFixCorrupted() throws IOException
     {
         CounterContext ctx = CounterContext.instance();
+        int now = (int) (System.currentTimeMillis() / 1000);
 
         // Check that corrupted context created prior to #2968 are fixed by removeOldShards
         NodeId id1 = NodeId.getLocalId();
@@ -96,19 +98,21 @@ public class CounterMutationTest extends
         ContextState state = ContextState.allocate(3, 2);
         state.writeElement(NodeId.fromInt(1), 1, 4, false);
         state.writeElement(id1, 3, 2, true);
-        state.writeElement(id2, -System.currentTimeMillis(), 5, true); // corrupted!
+        state.writeElement(id2, -100, 5, true); // corrupted!
 
         assert ctx.total(state.context) == 11;
 
         try
         {
-            ctx.removeOldShards(state.context, Integer.MAX_VALUE);
+            ByteBuffer merger = ctx.computeOldShardMerger(state.context, Collections.<NodeId.NodeIdRecord>emptyList(),
0);
+            ctx.removeOldShards(ctx.merge(state.context, merger), now);
             fail("RemoveOldShards should throw an exception if the current id is non-sensical");
         }
         catch (RuntimeException e) {}
 
         NodeId.renewLocalId();
-        ByteBuffer cleaned = ctx.removeOldShards(state.context, Integer.MAX_VALUE);
+        ByteBuffer merger = ctx.computeOldShardMerger(state.context, Collections.<NodeId.NodeIdRecord>emptyList(),
0);
+        ByteBuffer cleaned = ctx.removeOldShards(ctx.merge(state.context, merger), now);
         assert ctx.total(cleaned) == 11;
 
         // Check it is not corrupted anymore

Modified: cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/context/CounterContextTest.java?rev=1198725&r1=1198724&r2=1198725&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
(original)
+++ cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/context/CounterContextTest.java
Mon Nov  7 13:26:00 2011
@@ -326,54 +326,31 @@ public class CounterContextTest
         records.add(new NodeId.NodeIdRecord(id1, 2L));
         records.add(new NodeId.NodeIdRecord(id3, 4L));
 
-        // Destination of merge is a delta
-        ContextState ctx = ContextState.allocate(5, 2);
-        ctx.writeElement(id1, 1L, 1L);
+        ContextState ctx = ContextState.allocate(5, 3);
+        ctx.writeElement(id1, 1L, 1L, true);
         ctx.writeElement(NodeId.fromInt(2), 2L, 2L);
         ctx.writeElement(id3, 3L, 3L, true);
         ctx.writeElement(NodeId.fromInt(4), 6L, 3L);
         ctx.writeElement(NodeId.fromInt(5), 7L, 3L, true);
 
-        ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records);
+        ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records, Integer.MAX_VALUE);
+
         ContextState m = new ContextState(merger);
 
         assert m.getNodeId().equals(id1);
         assert m.getClock() <= -now;
-        assert m.getCount() == 0;
+        assert m.getCount() == -1L;
+        assert m.isDelta();
         m.moveToNext();
         assert m.getNodeId().equals(id3);
-        assert m.getClock() == 4L;
-        assert m.getCount() == 1L;
-        assert cc.total(ctx.context) == cc.total(cc.merge(ctx.context, merger));
-
-        // Source of merge is a delta
-        ctx = ContextState.allocate(4, 1);
-        ctx.writeElement(id1, 1L, 1L, true);
-        ctx.writeElement(NodeId.fromInt(2), 2L, 2L);
-        ctx.writeElement(id3, 3L, 3L);
-        ctx.writeElement(NodeId.fromInt(4), 6L, 3L);
-
-        merger = cc.computeOldShardMerger(ctx.context, records);
-        assert cc.total(ctx.context) == cc.total(cc.merge(ctx.context, merger));
-
-        // source and destination of merge are deltas
-        ctx = ContextState.allocate(4, 2);
-        ctx.writeElement(id1, 1L, 1L, true);
-        ctx.writeElement(NodeId.fromInt(2), 2L, 2L);
-        ctx.writeElement(id3, 3L, 3L, true);
-        ctx.writeElement(NodeId.fromInt(4), 6L, 3L);
-
-        merger = cc.computeOldShardMerger(ctx.context, records);
-        assert cc.total(ctx.context) == cc.total(cc.merge(ctx.context, merger));
-
-        // none of source and destination of merge are deltas
-        ctx = ContextState.allocate(4, 0);
-        ctx.writeElement(id1, 1L, 1L);
-        ctx.writeElement(NodeId.fromInt(2), 2L, 2L);
-        ctx.writeElement(id3, 3L, 3L);
-        ctx.writeElement(NodeId.fromInt(4), 6L, 3L);
-
-        merger = cc.computeOldShardMerger(ctx.context, records);
+        assert m.getClock() <= -now;
+        assert m.getCount() == -3L;
+        assert m.isDelta();
+        m.moveToNext();
+        assert m.getNodeId().equals(NodeId.getLocalId());
+        assert m.getClock() == 1L;
+        assert m.getCount() == 4L;
+        assert m.isDelta();
         assert cc.total(ctx.context) == cc.total(cc.merge(ctx.context, merger));
     }
 
@@ -388,29 +365,20 @@ public class CounterContextTest
         records.add(new NodeId.NodeIdRecord(id3, 4L));
         records.add(new NodeId.NodeIdRecord(id6, 10L));
 
-        ContextState ctx = ContextState.allocate(6, 2);
-        ctx.writeElement(id1, 1L, 1L);
+        ContextState ctx = ContextState.allocate(6, 3);
+        ctx.writeElement(id1, 1L, 1L, true);
         ctx.writeElement(NodeId.fromInt(2), 2L, 2L);
         ctx.writeElement(id3, 3L, 3L, true);
         ctx.writeElement(NodeId.fromInt(4), 6L, 3L);
         ctx.writeElement(NodeId.fromInt(5), 7L, 3L, true);
         ctx.writeElement(id6, 5L, 6L);
 
-        ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records);
+        ByteBuffer merger = cc.computeOldShardMerger(ctx.context, records, Integer.MAX_VALUE);
         ByteBuffer merged = cc.merge(ctx.context, merger);
         assert cc.total(ctx.context) == cc.total(merged);
 
         ByteBuffer cleaned = cc.removeOldShards(merged, (int)(System.currentTimeMillis()
/ 1000) + 1);
         assert cc.total(ctx.context) == cc.total(cleaned);
-        assert cleaned.remaining() == ctx.context.remaining() - stepLength;
-
-        merger = cc.computeOldShardMerger(cleaned, records);
-        merged = cc.merge(cleaned, merger);
-        assert cc.total(ctx.context) == cc.total(merged);
-
-        cleaned = cc.removeOldShards(merged, (int)(System.currentTimeMillis() / 1000) + 1);
-        assert cc.total(ctx.context) == cc.total(cleaned);
-        assert cleaned.remaining() == ctx.context.remaining() - 2 * stepLength - 2;
-
+        assert cleaned.remaining() == ctx.context.remaining() - stepLength - 2;
     }
 }



Mime
View raw message