cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [2/3] cassandra git commit: Improve batchlog write path
Date Wed, 02 Sep 2015 07:51:12 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/db/WriteResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WriteResponse.java b/src/java/org/apache/cassandra/db/WriteResponse.java
index 824368e..0dddaab 100644
--- a/src/java/org/apache/cassandra/db/WriteResponse.java
+++ b/src/java/org/apache/cassandra/db/WriteResponse.java
@@ -28,16 +28,22 @@ import org.apache.cassandra.net.MessagingService;
 /*
  * This empty response is sent by a replica to inform the coordinator that the write succeeded
  */
-public class WriteResponse
+public final class WriteResponse
 {
-    public static final WriteResponseSerializer serializer = new WriteResponseSerializer();
+    public static final Serializer serializer = new Serializer();
 
-    public MessageOut<WriteResponse> createMessage()
+    private static final WriteResponse instance = new WriteResponse();
+
+    private WriteResponse()
+    {
+    }
+
+    public static MessageOut<WriteResponse> createMessage()
     {
-        return new MessageOut<WriteResponse>(MessagingService.Verb.REQUEST_RESPONSE, this, serializer);
+        return new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, instance, serializer);
     }
 
-    public static class WriteResponseSerializer implements IVersionedSerializer<WriteResponse>
+    public static class Serializer implements IVersionedSerializer<WriteResponse>
     {
         public void serialize(WriteResponse wm, DataOutputPlus out, int version) throws IOException
         {
@@ -45,7 +51,7 @@ public class WriteResponse
 
         public WriteResponse deserialize(DataInputPlus in, int version) throws IOException
         {
-            return new WriteResponse();
+            return instance;
         }
 
         public long serializedSize(WriteResponse response, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/hints/EncodedHintMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/EncodedHintMessage.java b/src/java/org/apache/cassandra/hints/EncodedHintMessage.java
index 2797495..56727fc 100644
--- a/src/java/org/apache/cassandra/hints/EncodedHintMessage.java
+++ b/src/java/org/apache/cassandra/hints/EncodedHintMessage.java
@@ -65,8 +65,8 @@ final class EncodedHintMessage
             if (version != message.version)
                 throw new IllegalArgumentException("serializedSize() called with non-matching version " + version);
 
-            int size = (int) UUIDSerializer.serializer.serializedSize(message.hostId, version);
-            size += TypeSizes.sizeof(message.hint.remaining());
+            long size = UUIDSerializer.serializer.serializedSize(message.hostId, version);
+            size += TypeSizes.sizeofVInt(message.hint.remaining());
             size += message.hint.remaining();
             return size;
         }
@@ -77,7 +77,7 @@ final class EncodedHintMessage
                 throw new IllegalArgumentException("serialize() called with non-matching version " + version);
 
             UUIDSerializer.serializer.serialize(message.hostId, out, version);
-            out.writeInt(message.hint.remaining());
+            out.writeVInt(message.hint.remaining());
             out.write(message.hint);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/hints/Hint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/Hint.java b/src/java/org/apache/cassandra/hints/Hint.java
index d8f85c5..c88c494 100644
--- a/src/java/org/apache/cassandra/hints/Hint.java
+++ b/src/java/org/apache/cassandra/hints/Hint.java
@@ -26,6 +26,9 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
+import static org.apache.cassandra.db.TypeSizes.sizeof;
+import static org.apache.cassandra.db.TypeSizes.sizeofVInt;
+
 /**
  * Encapsulates the hinted mutation, its creation time, and the gc grace seconds param for each table involved.
  *
@@ -107,8 +110,8 @@ public final class Hint
     {
         public long serializedSize(Hint hint, int version)
         {
-            long size = TypeSizes.sizeof(hint.creationTime);
-            size += TypeSizes.sizeof(hint.gcgs);
+            long size = sizeof(hint.creationTime);
+            size += sizeofVInt(hint.gcgs);
             size += Mutation.serializer.serializedSize(hint.mutation, version);
             return size;
         }
@@ -116,14 +119,14 @@ public final class Hint
         public void serialize(Hint hint, DataOutputPlus out, int version) throws IOException
         {
             out.writeLong(hint.creationTime);
-            out.writeInt(hint.gcgs);
+            out.writeVInt(hint.gcgs);
             Mutation.serializer.serialize(hint.mutation, out, version);
         }
 
         public Hint deserialize(DataInputPlus in, int version) throws IOException
         {
             long creationTime = in.readLong();
-            int gcgs = in.readInt();
+            int gcgs = (int) in.readVInt();
             return new Hint(Mutation.serializer.deserialize(in, version), creationTime, gcgs);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/hints/HintMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintMessage.java b/src/java/org/apache/cassandra/hints/HintMessage.java
index 89baa89..6296a8c 100644
--- a/src/java/org/apache/cassandra/hints/HintMessage.java
+++ b/src/java/org/apache/cassandra/hints/HintMessage.java
@@ -24,6 +24,8 @@ import java.util.UUID;
 
 import javax.annotation.Nullable;
 
+import com.google.common.primitives.Ints;
+
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.UnknownColumnFamilyException;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -81,10 +83,10 @@ public final class HintMessage
     {
         public long serializedSize(HintMessage message, int version)
         {
-            int size = (int) UUIDSerializer.serializer.serializedSize(message.hostId, version);
+            long size = UUIDSerializer.serializer.serializedSize(message.hostId, version);
 
-            int hintSize = (int) Hint.serializer.serializedSize(message.hint, version);
-            size += TypeSizes.sizeof(hintSize);
+            long hintSize = Hint.serializer.serializedSize(message.hint, version);
+            size += TypeSizes.sizeofVInt(hintSize);
             size += hintSize;
 
             return size;
@@ -100,7 +102,7 @@ public final class HintMessage
              * We are serializing the hint size so that the receiver of the message could gracefully handle
              * deserialize failure when a table had been dropped, by simply skipping the unread bytes.
              */
-            out.writeInt((int) Hint.serializer.serializedSize(message.hint, version));
+            out.writeVInt(Hint.serializer.serializedSize(message.hint, version));
 
             Hint.serializer.serialize(message.hint, out, version);
         }
@@ -114,7 +116,7 @@ public final class HintMessage
         {
             UUID hostId = UUIDSerializer.serializer.deserialize(in, version);
 
-            int hintSize = in.readInt();
+            long hintSize = in.readVInt();
             BytesReadTracker countingIn = new BytesReadTracker(in);
             try
             {
@@ -122,7 +124,7 @@ public final class HintMessage
             }
             catch (UnknownColumnFamilyException e)
             {
-                in.skipBytes(hintSize - (int) countingIn.getBytesRead());
+                in.skipBytes(Ints.checkedCast(hintSize - countingIn.getBytesRead()));
                 return new HintMessage(hostId, e.cfId);
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java b/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
index 196f184..b0095ed 100644
--- a/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
+++ b/src/java/org/apache/cassandra/hints/LegacyHintsMigrator.java
@@ -92,7 +92,7 @@ public final class LegacyHintsMigrator
         compactLegacyHints();
 
         // paginate over legacy hints and write them to the new storage
-        logger.info("Migrating legacy hints to the new storage");
+        logger.info("Writing legacy hints to the new storage");
         migrateLegacyHints();
 
         // truncate the legacy hints table

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index e59cd58..15199fe 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.concurrent.TracingAwareExecutorService;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.batchlog.Batch;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.BootStrapper;
 import org.apache.cassandra.dht.IPartitioner;
@@ -103,8 +104,8 @@ public final class MessagingService implements MessagingServiceMBean
         READ_REPAIR,
         READ,
         REQUEST_RESPONSE, // client-initiated reads and writes
-        @Deprecated STREAM_INITIATE,
-        @Deprecated STREAM_INITIATE_DONE,
+        BATCH_STORE,  // was @Deprecated STREAM_INITIATE,
+        BATCH_REMOVE, // was @Deprecated STREAM_INITIATE_DONE,
         @Deprecated STREAM_REPLY,
         @Deprecated STREAM_REQUEST,
         RANGE_SLICE,
@@ -135,7 +136,6 @@ public final class MessagingService implements MessagingServiceMBean
         PAXOS_PROPOSE,
         PAXOS_COMMIT,
         @Deprecated PAGED_RANGE,
-        BATCHLOG_MUTATION,
         // remember to add new verbs at the end, since we serialize by ordinal
         UNUSED_1,
         UNUSED_2,
@@ -149,13 +149,14 @@ public final class MessagingService implements MessagingServiceMBean
     {{
         put(Verb.MUTATION, Stage.MUTATION);
         put(Verb.COUNTER_MUTATION, Stage.COUNTER_MUTATION);
-        put(Verb.BATCHLOG_MUTATION, Stage.BATCHLOG_MUTATION);
         put(Verb.READ_REPAIR, Stage.MUTATION);
         put(Verb.HINT, Stage.MUTATION);
         put(Verb.TRUNCATE, Stage.MUTATION);
         put(Verb.PAXOS_PREPARE, Stage.MUTATION);
         put(Verb.PAXOS_PROPOSE, Stage.MUTATION);
         put(Verb.PAXOS_COMMIT, Stage.MUTATION);
+        put(Verb.BATCH_STORE, Stage.MUTATION);
+        put(Verb.BATCH_REMOVE, Stage.MUTATION);
 
         put(Verb.READ, Stage.READ);
         put(Verb.RANGE_SLICE, Stage.READ);
@@ -209,7 +210,6 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.INTERNAL_RESPONSE, CallbackDeterminedSerializer.instance);
 
         put(Verb.MUTATION, Mutation.serializer);
-        put(Verb.BATCHLOG_MUTATION, Mutation.serializer);
         put(Verb.READ_REPAIR, Mutation.serializer);
         put(Verb.READ, ReadCommand.serializer);
         put(Verb.RANGE_SLICE, ReadCommand.rangeSliceSerializer);
@@ -229,6 +229,8 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.PAXOS_PROPOSE, Commit.serializer);
         put(Verb.PAXOS_COMMIT, Commit.serializer);
         put(Verb.HINT, HintMessage.serializer);
+        put(Verb.BATCH_STORE, Batch.serializer);
+        put(Verb.BATCH_REMOVE, UUIDSerializer.serializer);
     }};
 
     /**
@@ -238,7 +240,6 @@ public final class MessagingService implements MessagingServiceMBean
     {{
         put(Verb.MUTATION, WriteResponse.serializer);
         put(Verb.HINT, HintResponse.serializer);
-        put(Verb.BATCHLOG_MUTATION, WriteResponse.serializer);
         put(Verb.READ_REPAIR, WriteResponse.serializer);
         put(Verb.COUNTER_MUTATION, WriteResponse.serializer);
         put(Verb.RANGE_SLICE, ReadResponse.rangeSliceSerializer);
@@ -254,6 +255,9 @@ public final class MessagingService implements MessagingServiceMBean
 
         put(Verb.PAXOS_PREPARE, PrepareResponse.serializer);
         put(Verb.PAXOS_PROPOSE, BooleanSerializer.serializer);
+
+        put(Verb.BATCH_STORE, WriteResponse.serializer);
+        put(Verb.BATCH_REMOVE, WriteResponse.serializer);
     }};
 
     /* This records all the results mapped by message Id */
@@ -286,7 +290,7 @@ public final class MessagingService implements MessagingServiceMBean
     /* Lookup table for registering message handlers based on the verb. */
     private final Map<Verb, IVerbHandler> verbHandlers;
 
-    private final ConcurrentMap<InetAddress, OutboundTcpConnectionPool> connectionManagers = new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>();
+    private final ConcurrentMap<InetAddress, OutboundTcpConnectionPool> connectionManagers = new NonBlockingHashMap<>();
 
     private static final Logger logger = LoggerFactory.getLogger(MessagingService.class);
     private static final int LOG_DROPPED_INTERVAL_IN_MS = 5000;
@@ -301,14 +305,15 @@ public final class MessagingService implements MessagingServiceMBean
      */
     public static final EnumSet<Verb> DROPPABLE_VERBS = EnumSet.of(Verb._TRACE,
                                                                    Verb.MUTATION,
-                                                                   Verb.BATCHLOG_MUTATION, //FIXME: should this be droppable??
                                                                    Verb.COUNTER_MUTATION,
                                                                    Verb.HINT,
                                                                    Verb.READ_REPAIR,
                                                                    Verb.READ,
                                                                    Verb.RANGE_SLICE,
                                                                    Verb.PAGED_RANGE,
-                                                                   Verb.REQUEST_RESPONSE);
+                                                                   Verb.REQUEST_RESPONSE,
+                                                                   Verb.BATCH_STORE,
+                                                                   Verb.BATCH_REMOVE);
 
 
     private static final class DroppedMessages
@@ -372,7 +377,7 @@ public final class MessagingService implements MessagingServiceMBean
             droppedMessagesMap.put(verb, new DroppedMessages(verb));
 
         listenGate = new SimpleCondition();
-        verbHandlers = new EnumMap<Verb, IVerbHandler>(Verb.class);
+        verbHandlers = new EnumMap<>(Verb.class);
         if (!testOnly)
         {
             Runnable logDropped = new Runnable()
@@ -630,7 +635,6 @@ public final class MessagingService implements MessagingServiceMBean
                            boolean allowHints)
     {
         assert message.verb == Verb.MUTATION
-            || message.verb == Verb.BATCHLOG_MUTATION
             || message.verb == Verb.COUNTER_MUTATION
             || message.verb == Verb.PAXOS_COMMIT;
         int messageId = nextId();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index cf1e021..c8b9677 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -27,7 +27,6 @@ import java.rmi.registry.LocateRegistry;
 import java.rmi.server.RMIServerSocketFactory;
 import java.util.Collections;
 import java.util.Map;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -41,7 +40,6 @@ import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistryListener;
 import com.codahale.metrics.SharedMetricRegistries;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.slf4j.Logger;
@@ -52,6 +50,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.batchlog.LegacyBatchlogMigrator;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.StartupException;
@@ -280,6 +279,9 @@ public class CassandraDaemon
         // migrate any legacy (pre-3.0) hints from system.hints table into the new store
         new LegacyHintsMigrator(DatabaseDescriptor.getHintsDirectory(), DatabaseDescriptor.getMaxHintsFileSize()).migrate();
 
+        // migrate any legacy (pre-3.0) batch entries from system.batchlog to system.batches (new table format)
+        LegacyBatchlogMigrator.migrate();
+
         // enable auto compaction
         for (Keyspace keyspace : Keyspace.all())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 4952959..59f1c1c 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -24,12 +24,9 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import javax.annotation.Nullable;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.cache.CacheLoader;
 import com.google.common.collect.*;
@@ -44,14 +41,19 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.HintedHandOffManager;
+import org.apache.cassandra.batchlog.*;
+import org.apache.cassandra.batchlog.LegacyBatchlogMigrator;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
-import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.view.MaterializedViewManager;
 import org.apache.cassandra.db.view.MaterializedViewUtils;
-import org.apache.cassandra.dht.*;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.RingPosition;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
@@ -67,8 +69,8 @@ import org.apache.cassandra.service.paxos.PrepareCallback;
 import org.apache.cassandra.service.paxos.ProposeCallback;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.triggers.TriggerExecutor;
-import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.AbstractIterator;
 
 public class StorageProxy implements StorageProxyMBean
 {
@@ -687,31 +689,19 @@ public class StorageProxy implements StorageProxyMBean
                                                                                  WriteType.BATCH,
                                                                                  cleanup);
 
-                //When local node is the endpoint and there are no pending nodes we can
+                // When local node is the endpoint and there are no pending nodes we can
                 // Just apply the mutation locally.
-                if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) &&
-                    wrapper.handler.pendingEndpoints.isEmpty())
-                {
-                    if (writeCommitLog)
-                        mutation.apply();
-                    else
-                        mutation.applyUnsafe();
-                }
+                if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) && wrapper.handler.pendingEndpoints.isEmpty())
+                    mutation.apply(writeCommitLog);
                 else
-                {
                     wrappers.add(wrapper);
-                }
             }
 
             if (!wrappers.isEmpty())
             {
-                Mutation blMutation = BatchlogManager.getBatchlogMutationFor(Lists.transform(wrappers, w -> w.mutation), batchUUID, MessagingService.current_version);
-
-                //Apply to local batchlog memtable in this thread
-                if (writeCommitLog)
-                    blMutation.apply();
-                else
-                    blMutation.applyUnsafe();
+                // Apply to local batchlog memtable in this thread
+                BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(), Lists.transform(wrappers, w -> w.mutation)),
+                                      writeCommitLog);
 
                 // now actually perform the writes and wait for them to complete
                 asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION);
@@ -781,16 +771,10 @@ public class StorageProxy implements StorageProxyMBean
                     batchConsistencyLevel = consistency_level;
             }
 
-            final Collection<InetAddress> batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel);
+            final BatchlogEndpoints batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel);
             final UUID batchUUID = UUIDGen.getTimeUUID();
             BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
-                                                                                                          new BatchlogResponseHandler.BatchlogCleanupCallback()
-                                                                                                          {
-                                                                                                              public void invoke()
-                                                                                                              {
-                                                                                                                  asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID);
-                                                                                                              }
-                                                                                                          });
+                                                                                                          () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
 
             // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
             for (Mutation mutation : mutations)
@@ -840,75 +824,64 @@ public class StorageProxy implements StorageProxyMBean
         return replica.equals(FBUtilities.getBroadcastAddress());
     }
 
+    private static void syncWriteToBatchlog(Collection<Mutation> mutations, BatchlogEndpoints endpoints, UUID uuid)
+    throws WriteTimeoutException, WriteFailureException
+    {
+        WriteResponseHandler<?> handler = new WriteResponseHandler<>(endpoints.all,
+                                                                     Collections.<InetAddress>emptyList(),
+                                                                     endpoints.all.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO,
+                                                                     Keyspace.open(SystemKeyspace.NAME),
+                                                                     null,
+                                                                     WriteType.BATCH_LOG);
+
+        Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations);
+
+        if (!endpoints.current.isEmpty())
+            syncWriteToBatchlog(handler, batch, endpoints.current);
+
+        if (!endpoints.legacy.isEmpty())
+            LegacyBatchlogMigrator.syncWriteToBatchlog(handler, batch, endpoints.legacy);
+
+        handler.get();
+    }
 
-    private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddress> endpoints, UUID uuid)
+    private static void syncWriteToBatchlog(WriteResponseHandler<?> handler, Batch batch, Collection<InetAddress> endpoints)
     throws WriteTimeoutException, WriteFailureException
     {
-        AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints,
-                                                                        Collections.<InetAddress>emptyList(),
-                                                                        ConsistencyLevel.ONE,
-                                                                        Keyspace.open(SystemKeyspace.NAME),
-                                                                        null,
-                                                                        WriteType.BATCH_LOG);
+        MessageOut<Batch> message = new MessageOut<>(MessagingService.Verb.BATCH_STORE, batch, Batch.serializer);
 
-        MessageOut<Mutation> message = BatchlogManager.getBatchlogMutationFor(mutations, uuid, MessagingService.current_version)
-                                                      .createMessage(MessagingService.Verb.BATCHLOG_MUTATION);
         for (InetAddress target : endpoints)
         {
-            int targetVersion = MessagingService.instance().getVersion(target);
+            logger.debug("Sending batchlog store request {} to {} for {} mutations", batch.id, target, batch.size());
+
             if (canDoLocalRequest(target))
-            {
-                insertLocal(Stage.BATCHLOG_MUTATION, message.payload, handler);
-            }
-            else if (targetVersion < MessagingService.VERSION_30)
-            {
-                MessagingService.instance().sendRR(BatchlogManager.getBatchlogMutationFor(mutations, uuid, targetVersion)
-                                                                  .createMessage(MessagingService.Verb.MUTATION),
-                                                   target,
-                                                   handler,
-                                                   false);
-            }
+                performLocally(Stage.MUTATION, () -> BatchlogManager.store(batch), handler);
             else
-            {
-                MessagingService.instance().sendRR(message, target, handler, false);
-            }
+                MessagingService.instance().sendRR(message, target, handler);
         }
+    }
 
-        handler.get();
+    private static void asyncRemoveFromBatchlog(BatchlogEndpoints endpoints, UUID uuid)
+    {
+        if (!endpoints.current.isEmpty())
+            asyncRemoveFromBatchlog(endpoints.current, uuid);
+
+        if (!endpoints.legacy.isEmpty())
+            LegacyBatchlogMigrator.asyncRemoveFromBatchlog(endpoints.legacy, uuid);
     }
 
     private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
     {
-        AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints,
-                                                                        Collections.<InetAddress>emptyList(),
-                                                                        ConsistencyLevel.ANY,
-                                                                        Keyspace.open(SystemKeyspace.NAME),
-                                                                        null,
-                                                                        WriteType.SIMPLE);
-        Mutation mutation = new Mutation(
-                PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batches,
-                                                    UUIDType.instance.decompose(uuid),
-                                                    FBUtilities.timestampMicros(),
-                                                    FBUtilities.nowInSeconds()));
-        MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.BATCHLOG_MUTATION);
+        MessageOut<UUID> message = new MessageOut<>(MessagingService.Verb.BATCH_REMOVE, uuid, UUIDSerializer.serializer);
         for (InetAddress target : endpoints)
         {
-            int targetVersion = MessagingService.instance().getVersion(target);
+            if (logger.isDebugEnabled())
+                logger.debug("Sending batchlog remove request {} to {}", uuid, target);
+
             if (canDoLocalRequest(target))
-            {
-                insertLocal(Stage.BATCHLOG_MUTATION, message.payload, handler);
-            }
-            else if (targetVersion < MessagingService.VERSION_30)
-            {
-                MessagingService.instance().sendRR(mutation.createMessage(MessagingService.Verb.MUTATION),
-                                                   target,
-                                                   handler,
-                                                   false);
-            }
+                performLocally(Stage.MUTATION, () -> BatchlogManager.remove(uuid));
             else
-            {
-                MessagingService.instance().sendRR(message, target, handler, false);
-            }
+                MessagingService.instance().sendOneWay(message, target);
         }
     }
 
@@ -1034,13 +1007,38 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     /*
+     * A class to filter batchlog endpoints into legacy endpoints (version < 3.0) or not.
+     */
+    private static final class BatchlogEndpoints
+    {
+        public final Collection<InetAddress> all;
+        public final Collection<InetAddress> current;
+        public final Collection<InetAddress> legacy;
+
+        BatchlogEndpoints(Collection<InetAddress> endpoints)
+        {
+            all = endpoints;
+            current = new ArrayList<>(2);
+            legacy = new ArrayList<>(2);
+
+            for (InetAddress ep : endpoints)
+            {
+                if (MessagingService.instance().getVersion(ep) >= MessagingService.VERSION_30)
+                    current.add(ep);
+                else
+                    legacy.add(ep);
+            }
+        }
+    }
+
+    /*
      * Replicas are picked manually:
      * - replicas should be alive according to the failure detector
      * - replicas should be in the local datacenter
      * - choose min(2, number of qualifying candiates above)
      * - allow the local node to be the only replica only if it's a single-node DC
      */
-    private static Collection<InetAddress> getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel)
+    private static BatchlogEndpoints getBatchlogEndpoints(String localDataCenter, ConsistencyLevel consistencyLevel)
     throws UnavailableException
     {
         TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology();
@@ -1051,12 +1049,12 @@ public class StorageProxy implements StorageProxyMBean
         if (chosenEndpoints.isEmpty())
         {
             if (consistencyLevel == ConsistencyLevel.ANY)
-                return Collections.singleton(FBUtilities.getBroadcastAddress());
+                return new BatchlogEndpoints(Collections.singleton(FBUtilities.getBroadcastAddress()));
 
             throw new UnavailableException(ConsistencyLevel.ONE, 1, 0);
         }
 
-        return chosenEndpoints;
+        return new BatchlogEndpoints(chosenEndpoints);
     }
 
     /**
@@ -1109,7 +1107,8 @@ public class StorageProxy implements StorageProxyMBean
                 if (canDoLocalRequest(destination))
                 {
                     insertLocal = true;
-                } else
+                }
+                else
                 {
                     // belongs on a different server
                     if (message == null)
@@ -1120,14 +1119,15 @@ public class StorageProxy implements StorageProxyMBean
                     if (localDataCenter.equals(dc))
                     {
                         MessagingService.instance().sendRR(message, destination, responseHandler, true);
-                    } else
+                    }
+                    else
                     {
                         Collection<InetAddress> messages = (dcGroups != null) ? dcGroups.get(dc) : null;
                         if (messages == null)
                         {
-                            messages = new ArrayList<InetAddress>(3); // most DCs will have <= 3 replicas
+                            messages = new ArrayList<>(3); // most DCs will have <= 3 replicas
                             if (dcGroups == null)
-                                dcGroups = new HashMap<String, Collection<InetAddress>>();
+                                dcGroups = new HashMap<>();
                             dcGroups.put(dc, messages);
                         }
                         messages.add(destination);
@@ -1149,7 +1149,7 @@ public class StorageProxy implements StorageProxyMBean
             submitHint(mutation, endpointsToHint, responseHandler);
 
         if (insertLocal)
-            insertLocal(stage, mutation, responseHandler);
+            performLocally(stage, mutation::apply, responseHandler);
 
         if (dcGroups != null)
         {
@@ -1198,7 +1198,7 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    private static void insertLocal(Stage stage, final Mutation mutation, final AbstractWriteResponseHandler<IMutation> responseHandler)
+    private static void performLocally(Stage stage, final Runnable runnable)
     {
         StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable()
         {
@@ -1206,14 +1206,32 @@ public class StorageProxy implements StorageProxyMBean
             {
                 try
                 {
-                    mutation.apply();
-                    responseHandler.response(null);
+                    runnable.run();
+                }
+                catch (Exception ex)
+                {
+                    logger.error("Failed to apply mutation locally : {}", ex);
+                }
+            }
+        });
+    }
+
+    private static void performLocally(Stage stage, final Runnable runnable, final IAsyncCallbackWithFailure<?> handler)
+    {
+        StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable()
+        {
+            public void runMayThrow()
+            {
+                try
+                {
+                    runnable.run();
+                    handler.response(null);
                 }
                 catch (Exception ex)
                 {
                     if (!(ex instanceof WriteTimeoutException))
                         logger.error("Failed to apply mutation locally : {}", ex);
-                    responseHandler.onFailure(FBUtilities.getBroadcastAddress());
+                    handler.onFailure(FBUtilities.getBroadcastAddress());
                 }
             }
         });

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index a6c2f8b..13dc29c7 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -55,6 +55,9 @@ import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.batchlog.BatchStoreVerbHandler;
+import org.apache.cassandra.batchlog.BatchRemoveVerbHandler;
+import org.apache.cassandra.batchlog.BatchlogManager;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.lifecycle.TransactionLog;
@@ -282,7 +285,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
         /* register the verb handlers */
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MUTATION, new MutationVerbHandler());
-        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCHLOG_MUTATION, new MutationVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR, new ReadRepairVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, new ReadCommandVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new RangeSliceVerbHandler());
@@ -311,6 +313,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.SNAPSHOT, new SnapshotVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.ECHO, new EchoVerbHandler());
+
+        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCH_STORE, new BatchStoreVerbHandler());
+        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCH_REMOVE, new BatchRemoveVerbHandler());
     }
 
     public void registerDaemon(CassandraDaemon daemon)
@@ -620,12 +625,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             {
                 inShutdownHook = true;
                 ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION);
-                ExecutorService batchlogMutationStage = StageManager.getStage(Stage.BATCHLOG_MUTATION);
                 ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
                 ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
                 if (mutationStage.isShutdown()
                     && counterMutationStage.isShutdown()
-                    && batchlogMutationStage.isShutdown()
                     && materializedViewMutationStage.isShutdown())
                     return; // drained already
 
@@ -638,12 +641,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 // before mutation stage, so we can get all the hints saved before shutting down
                 MessagingService.instance().shutdown();
                 materializedViewMutationStage.shutdown();
-                batchlogMutationStage.shutdown();
                 HintsService.instance.pauseDispatch();
                 counterMutationStage.shutdown();
                 mutationStage.shutdown();
                 materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
-                batchlogMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
                 counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
                 mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
                 StorageProxy.instance.verifyNoHintsInProgress();
@@ -3846,17 +3847,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     {
         inShutdownHook = true;
 
-        BatchlogManager.shutdown();
+        BatchlogManager.instance.shutdown();
 
         HintsService.instance.pauseDispatch();
 
         ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION);
-        ExecutorService batchlogMutationStage = StageManager.getStage(Stage.BATCHLOG_MUTATION);
         ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION);
         ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
         if (mutationStage.isTerminated()
             && counterMutationStage.isTerminated()
-            && batchlogMutationStage.isTerminated()
             && materializedViewMutationStage.isTerminated())
         {
             logger.warn("Cannot drain node (did it already happen?)");
@@ -3872,11 +3871,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
         setMode(Mode.DRAINING, "clearing mutation stage", false);
         materializedViewMutationStage.shutdown();
-        batchlogMutationStage.shutdown();
         counterMutationStage.shutdown();
         mutationStage.shutdown();
         materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
-        batchlogMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
         counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
         mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
 
@@ -3913,6 +3910,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
         FBUtilities.waitOnFutures(flushes);
 
+        BatchlogManager.instance.shutdown();
+
         HintsService.instance.shutdownBlocking();
 
         // whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java b/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java
index 213023e..a702a4d 100644
--- a/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/paxos/CommitVerbHandler.java
@@ -1,4 +1,3 @@
-package org.apache.cassandra.service.paxos;
 /*
  * 
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,7 +18,7 @@ package org.apache.cassandra.service.paxos;
  * under the License.
  * 
  */
-
+package org.apache.cassandra.service.paxos;
 
 import org.apache.cassandra.db.WriteResponse;
 import org.apache.cassandra.net.IVerbHandler;
@@ -33,8 +32,7 @@ public class CommitVerbHandler implements IVerbHandler<Commit>
     {
         PaxosState.commit(message.payload);
 
-        WriteResponse response = new WriteResponse();
         Tracing.trace("Enqueuing acknowledge to {}", message.from);
-        MessagingService.instance().sendReply(response.createMessage(), id, message.from);
+        MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 6909ea4..5f77097 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -54,8 +54,8 @@ import javax.management.remote.JMXServiceURL;
 import javax.rmi.ssl.SslRMIClientSocketFactory;
 
 import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.db.BatchlogManager;
-import org.apache.cassandra.db.BatchlogManagerMBean;
+import org.apache.cassandra.batchlog.BatchlogManager;
+import org.apache.cassandra.batchlog.BatchlogManagerMBean;
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
 import org.apache.cassandra.db.HintedHandOffManagerMBean;
 import org.apache.cassandra.db.compaction.CompactionManager;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java b/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
index 9738103..b833e60 100644
--- a/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
+++ b/test/long/org/apache/cassandra/cql3/MaterializedViewLongTest.java
@@ -35,7 +35,7 @@ import com.datastax.driver.core.exceptions.WriteTimeoutException;
 import org.apache.cassandra.concurrent.SEPExecutor;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.db.BatchlogManager;
+import org.apache.cassandra.batchlog.BatchlogManager;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 public class MaterializedViewLongTest extends CQLTester

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/test/unit/org/apache/cassandra/batchlog/BatchTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchTest.java b/test/unit/org/apache/cassandra/batchlog/BatchTest.java
new file mode 100644
index 0000000..b7a4100
--- /dev/null
+++ b/test/unit/org/apache/cassandra/batchlog/BatchTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.junit.Assert.assertEquals;
+
+public class BatchTest
+{
+    private static final String KEYSPACE = "BatchRequestTest";
+    private static final String CF_STANDARD = "Standard";
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF_STANDARD, 1, BytesType.instance));
+    }
+
+    @Test
+    public void testSerialization() throws IOException
+    {
+        CFMetaData cfm = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF_STANDARD).metadata;
+
+        long now = FBUtilities.timestampMicros();
+        int version = MessagingService.current_version;
+        UUID uuid = UUIDGen.getTimeUUID();
+
+        List<Mutation> mutations = new ArrayList<>(10);
+        for (int i = 0; i < 10; i++)
+        {
+            mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i))
+                          .clustering("name" + i)
+                          .add("val", "val" + i)
+                          .build());
+        }
+
+        Batch batch1 = Batch.createLocal(uuid, now, mutations);
+        assertEquals(uuid, batch1.id);
+        assertEquals(now, batch1.creationTime);
+        assertEquals(mutations, batch1.decodedMutations);
+
+        DataOutputBuffer out = new DataOutputBuffer();
+        Batch.serializer.serialize(batch1, out, version);
+
+        assertEquals(out.getLength(), Batch.serializer.serializedSize(batch1, version));
+
+        DataInputPlus dis = new DataInputBuffer(out.getData());
+        Batch batch2 = Batch.serializer.deserialize(dis, version);
+
+        assertEquals(batch1.id, batch2.id);
+        assertEquals(batch1.creationTime, batch2.creationTime);
+        assertEquals(batch1.decodedMutations.size(), batch2.encodedMutations.size());
+
+        Iterator<Mutation> it1 = batch1.decodedMutations.iterator();
+        Iterator<ByteBuffer> it2 = batch2.encodedMutations.iterator();
+        while (it1.hasNext())
+        {
+            try (DataInputBuffer in = new DataInputBuffer(it2.next().array()))
+            {
+                assertEquals(it1.next().toString(), Mutation.serializer.deserialize(in, version).toString());
+            }
+        }
+    }
+
+    /**
+     * This is just to test decodeMutations() when deserializing,
+     * since Batch will never be serialized at a version 2.2.
+     * @throws IOException
+     */
+    @Test
+    public void testSerializationNonCurrentVersion() throws IOException
+    {
+        CFMetaData cfm = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF_STANDARD).metadata;
+
+        long now = FBUtilities.timestampMicros();
+        int version = MessagingService.VERSION_22;
+        UUID uuid = UUIDGen.getTimeUUID();
+
+        List<Mutation> mutations = new ArrayList<>(10);
+        for (int i = 0; i < 10; i++)
+        {
+            mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), bytes(i))
+                          .clustering("name" + i)
+                          .add("val", "val" + i)
+                          .build());
+        }
+
+        Batch batch1 = Batch.createLocal(uuid, now, mutations);
+        assertEquals(uuid, batch1.id);
+        assertEquals(now, batch1.creationTime);
+        assertEquals(mutations, batch1.decodedMutations);
+
+        DataOutputBuffer out = new DataOutputBuffer();
+        Batch.serializer.serialize(batch1, out, version);
+
+        assertEquals(out.getLength(), Batch.serializer.serializedSize(batch1, version));
+
+        DataInputPlus dis = new DataInputBuffer(out.getData());
+        Batch batch2 = Batch.serializer.deserialize(dis, version);
+
+        assertEquals(batch1.id, batch2.id);
+        assertEquals(batch1.creationTime, batch2.creationTime);
+        assertEquals(batch1.decodedMutations.size(), batch2.decodedMutations.size());
+
+        Iterator<Mutation> it1 = batch1.decodedMutations.iterator();
+        Iterator<Mutation> it2 = batch2.decodedMutations.iterator();
+        while (it1.hasNext())
+            assertEquals(it1.next().toString(), it2.next().toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
new file mode 100644
index 0000000..23aeaaa
--- /dev/null
+++ b/test/unit/org/apache/cassandra/batchlog/BatchlogEndpointFilterTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashSet;
+
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
+import org.junit.Test;
+import org.junit.matchers.JUnitMatchers;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public class BatchlogEndpointFilterTest
+{
+    private static final String LOCAL = "local";
+
+    @Test
+    public void shouldSelect2hostsFromNonLocalRacks() throws UnknownHostException
+    {
+        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+                .put(LOCAL, InetAddress.getByName("0"))
+                .put(LOCAL, InetAddress.getByName("00"))
+                .put("1", InetAddress.getByName("1"))
+                .put("1", InetAddress.getByName("11"))
+                .put("2", InetAddress.getByName("2"))
+                .put("2", InetAddress.getByName("22"))
+                .build();
+        Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        assertThat(result.size(), is(2));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("11")));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("22")));
+    }
+
+    @Test
+    public void shouldSelectHostFromLocal() throws UnknownHostException
+    {
+        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+                .put(LOCAL, InetAddress.getByName("0"))
+                .put(LOCAL, InetAddress.getByName("00"))
+                .put("1", InetAddress.getByName("1"))
+                .build();
+        Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        assertThat(result.size(), is(2));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("1")));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
+    }
+
+    @Test
+    public void shouldReturnAsIsIfNoEnoughEndpoints() throws UnknownHostException
+    {
+        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+                .put(LOCAL, InetAddress.getByName("0"))
+                .build();
+        Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        assertThat(result.size(), is(1));
+        assertThat(result, JUnitMatchers.hasItem(InetAddress.getByName("0")));
+    }
+
+    @Test
+    public void shouldSelectTwoRandomHostsFromSingleOtherRack() throws UnknownHostException
+    {
+        Multimap<String, InetAddress> endpoints = ImmutableMultimap.<String, InetAddress> builder()
+                .put(LOCAL, InetAddress.getByName("0"))
+                .put(LOCAL, InetAddress.getByName("00"))
+                .put("1", InetAddress.getByName("1"))
+                .put("1", InetAddress.getByName("11"))
+                .put("1", InetAddress.getByName("111"))
+                .build();
+        Collection<InetAddress> result = new TestEndpointFilter(LOCAL, endpoints).filter();
+        // result should contain random two distinct values
+        assertThat(new HashSet<>(result).size(), is(2));
+    }
+
+    private static class TestEndpointFilter extends BatchlogManager.EndpointFilter
+    {
+        TestEndpointFilter(String localRack, Multimap<String, InetAddress> endpoints)
+        {
+            super(localRack, endpoints);
+        }
+
+        @Override
+        protected boolean isValid(InetAddress input)
+        {
+            // We will use always alive non-localhost endpoints
+            return true;
+        }
+
+        @Override
+        protected int getRandomInt(int bound)
+        {
+            // We don't need random behavior here
+            return bound - 1;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53a177a9/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
new file mode 100644
index 0000000..dfb17c3
--- /dev/null
+++ b/test/unit/org/apache/cassandra/batchlog/BatchlogManagerTest.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.batchlog;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Lists;
+import org.junit.*;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.Util.PartitionerSwitcher;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+import static org.junit.Assert.*;
+
+public class BatchlogManagerTest
+{
+    private static final String KEYSPACE1 = "BatchlogManagerTest1";
+    private static final String CF_STANDARD1 = "Standard1";
+    private static final String CF_STANDARD2 = "Standard2";
+    private static final String CF_STANDARD3 = "Standard3";
+    private static final String CF_STANDARD4 = "Standard4";
+    private static final String CF_STANDARD5 = "Standard5";
+
+    static PartitionerSwitcher sw;
+
+    @BeforeClass
+    public static void defineSchema() throws ConfigurationException
+    {
+        sw = Util.switchPartitioner(Murmur3Partitioner.instance);
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1, 1, BytesType.instance),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2, 1, BytesType.instance),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD3, 1, BytesType.instance),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD4, 1, BytesType.instance),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD5, 1, BytesType.instance));
+    }
+
+    @AfterClass
+    public static void cleanup()
+    {
+        sw.close();
+    }
+
+    @Before
+    @SuppressWarnings("deprecation")
+    public void setUp() throws Exception
+    {
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        InetAddress localhost = InetAddress.getByName("127.0.0.1");
+        metadata.updateNormalToken(Util.token("A"), localhost);
+        metadata.updateHostId(UUIDGen.getTimeUUID(), localhost);
+        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).truncateBlocking();
+        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG).truncateBlocking();
+    }
+
+    @Test
+    public void testDelete()
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
+        CFMetaData cfm = cfs.metadata;
+        new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes("1234"))
+                .clustering("c")
+                .add("val", "val" + 1234)
+                .build()
+                .applyUnsafe();
+
+        DecoratedKey dk = cfs.decorateKey(ByteBufferUtil.bytes("1234"));
+        ImmutableBTreePartition results = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, dk).build());
+        Iterator<Row> iter = results.iterator();
+        assert iter.hasNext();
+
+        Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(cfm,
+                                                         dk,
+                                                         FBUtilities.timestampMicros(),
+                                                         FBUtilities.nowInSeconds()));
+        mutation.applyUnsafe();
+
+        Util.assertEmpty(Util.cmd(cfs, dk).build());
+    }
+
+    @Test
+    public void testReplay() throws Exception
+    {
+        testReplay(false);
+    }
+
+    @Test
+    public void testLegacyReplay() throws Exception
+    {
+        testReplay(true);
+    }
+
+    @SuppressWarnings("deprecation")
+    private static void testReplay(boolean legacy) throws Exception
+    {
+        long initialAllBatches = BatchlogManager.instance.countAllBatches();
+        long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed();
+
+        CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1).metadata;
+
+        // Generate 1000 mutations (100 batches of 10 mutations each) and put them all into the batchlog.
+        // Half batches (50) ready to be replayed, half not.
+        for (int i = 0; i < 100; i++)
+        {
+            List<Mutation> mutations = new ArrayList<>(10);
+            for (int j = 0; j < 10; j++)
+            {
+                mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
+                              .clustering("name" + j)
+                              .add("val", "val" + j)
+                              .build());
+            }
+
+            long timestamp = i < 50
+                           ? (System.currentTimeMillis() - BatchlogManager.getBatchlogTimeout())
+                           : (System.currentTimeMillis() + BatchlogManager.getBatchlogTimeout());
+
+            if (legacy)
+                LegacyBatchlogMigrator.store(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i), timestamp * 1000, mutations), MessagingService.current_version);
+            else
+                BatchlogManager.store(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i), timestamp * 1000, mutations));
+        }
+
+        if (legacy)
+        {
+            Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.LEGACY_BATCHLOG).forceBlockingFlush();
+            LegacyBatchlogMigrator.migrate();
+        }
+
+        // Flush the batchlog to disk (see CASSANDRA-6822).
+        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
+
+        assertEquals(100, BatchlogManager.instance.countAllBatches() - initialAllBatches);
+        assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
+
+        // Force batchlog replay and wait for it to complete.
+        BatchlogManager.instance.startBatchlogReplay().get();
+
+        // Ensure that the first half, and only the first half, got replayed.
+        assertEquals(50, BatchlogManager.instance.countAllBatches() - initialAllBatches);
+        assertEquals(50, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
+
+        for (int i = 0; i < 100; i++)
+        {
+            String query = String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD1, i);
+            UntypedResultSet result = executeInternal(query);
+            assertNotNull(result);
+            if (i < 50)
+            {
+                Iterator<UntypedResultSet.Row> it = result.iterator();
+                assertNotNull(it);
+                for (int j = 0; j < 10; j++)
+                {
+                    assertTrue(it.hasNext());
+                    UntypedResultSet.Row row = it.next();
+
+                    assertEquals(ByteBufferUtil.bytes(i), row.getBytes("key"));
+                    assertEquals("name" + j, row.getString("name"));
+                    assertEquals("val" + j, row.getString("val"));
+                }
+
+                assertFalse(it.hasNext());
+            }
+            else
+            {
+                assertTrue(result.isEmpty());
+            }
+        }
+
+        // Ensure that no stray mutations got somehow applied.
+        UntypedResultSet result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", KEYSPACE1, CF_STANDARD1));
+        assertNotNull(result);
+        assertEquals(500, result.one().getLong("count"));
+    }
+
+    @Test
+    public void testTruncatedReplay() throws InterruptedException, ExecutionException
+    {
+        CFMetaData cf2 = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD2);
+        CFMetaData cf3 = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD3);
+        // Generate 2000 mutations (1000 batchlog entries) and put them all into the batchlog.
+        // Each batchlog entry with a mutation for Standard2 and Standard3.
+        // In the middle of the process, 'truncate' Standard2.
+        for (int i = 0; i < 1000; i++)
+        {
+            Mutation mutation1 = new RowUpdateBuilder(cf2, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
+                .clustering("name" + i)
+                .add("val", "val" + i)
+                .build();
+            Mutation mutation2 = new RowUpdateBuilder(cf3, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
+                .clustering("name" + i)
+                .add("val", "val" + i)
+                .build();
+
+            List<Mutation> mutations = Lists.newArrayList(mutation1, mutation2);
+
+            // Make sure it's ready to be replayed, so adjust the timestamp.
+            long timestamp = System.currentTimeMillis() - BatchlogManager.getBatchlogTimeout();
+
+            if (i == 500)
+                SystemKeyspace.saveTruncationRecord(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2),
+                                                    timestamp,
+                                                    ReplayPosition.NONE);
+
+            // Adjust the timestamp (slightly) to make the test deterministic.
+            if (i >= 500)
+                timestamp++;
+            else
+                timestamp--;
+
+            BatchlogManager.store(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i), FBUtilities.timestampMicros(), mutations));
+        }
+
+        // Flush the batchlog to disk (see CASSANDRA-6822).
+        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
+
+        // Force batchlog replay and wait for it to complete.
+        BatchlogManager.instance.startBatchlogReplay().get();
+
+        // We should see half of Standard2-targeted mutations written after the replay and all of Standard3 mutations applied.
+        for (int i = 0; i < 1000; i++)
+        {
+            UntypedResultSet result = executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD2,i));
+            assertNotNull(result);
+            if (i >= 500)
+            {
+                assertEquals(ByteBufferUtil.bytes(i), result.one().getBytes("key"));
+                assertEquals("name" + i, result.one().getString("name"));
+                assertEquals("val" + i, result.one().getString("val"));
+            }
+            else
+            {
+                assertTrue(result.isEmpty());
+            }
+        }
+
+        for (int i = 0; i < 1000; i++)
+        {
+            UntypedResultSet result = executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD3, i));
+            assertNotNull(result);
+            assertEquals(ByteBufferUtil.bytes(i), result.one().getBytes("key"));
+            assertEquals("name" + i, result.one().getString("name"));
+            assertEquals("val" + i, result.one().getString("val"));
+        }
+    }
+
+    @Test
+    @SuppressWarnings("deprecation")
+    public void testConversion() throws Exception
+    {
+        long initialAllBatches = BatchlogManager.instance.countAllBatches();
+        long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed();
+        CFMetaData cfm = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD4);
+
+        // Generate 1400 version 2.0 mutations and put them all into the batchlog.
+        // Half ready to be replayed, half not.
+        for (int i = 0; i < 1400; i++)
+        {
+            Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
+                .clustering("name" + i)
+                .add("val", "val" + i)
+                .build();
+
+            long timestamp = i < 700
+                           ? (System.currentTimeMillis() - BatchlogManager.getBatchlogTimeout())
+                           : (System.currentTimeMillis() + BatchlogManager.getBatchlogTimeout());
+
+
+            Mutation batchMutation = LegacyBatchlogMigrator.getStoreMutation(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i),
+                                                                                               TimeUnit.MILLISECONDS.toMicros(timestamp),
+                                                                                               Collections.singleton(mutation)),
+                                                                             MessagingService.VERSION_20);
+            assertTrue(LegacyBatchlogMigrator.isLegacyBatchlogMutation(batchMutation));
+            LegacyBatchlogMigrator.handleLegacyMutation(batchMutation);
+        }
+
+        // Mix in 100 current version mutations, 50 ready for replay.
+        for (int i = 1400; i < 1500; i++)
+        {
+            Mutation mutation = new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(i))
+                .clustering("name" + i)
+                .add("val", "val" + i)
+                .build();
+
+            long timestamp = i < 1450
+                           ? (System.currentTimeMillis() - BatchlogManager.getBatchlogTimeout())
+                           : (System.currentTimeMillis() + BatchlogManager.getBatchlogTimeout());
+
+
+            BatchlogManager.store(Batch.createLocal(UUIDGen.getTimeUUID(timestamp, i),
+                                                    FBUtilities.timestampMicros(),
+                                                    Collections.singleton(mutation)));
+        }
+
+        // Flush the batchlog to disk (see CASSANDRA-6822).
+        Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.BATCHES).forceBlockingFlush();
+
+        assertEquals(1500, BatchlogManager.instance.countAllBatches() - initialAllBatches);
+        assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
+
+        UntypedResultSet result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.LEGACY_BATCHLOG));
+        assertNotNull(result);
+        assertEquals("Count in blog legacy", 0, result.one().getLong("count"));
+        result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.BATCHES));
+        assertNotNull(result);
+        assertEquals("Count in blog", 1500, result.one().getLong("count"));
+
+        // Force batchlog replay and wait for it to complete.
+        BatchlogManager.instance.performInitialReplay();
+
+        // Ensure that the first half, and only the first half, got replayed.
+        assertEquals(750, BatchlogManager.instance.countAllBatches() - initialAllBatches);
+        assertEquals(750, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
+
+        for (int i = 0; i < 1500; i++)
+        {
+            result = executeInternal(String.format("SELECT * FROM \"%s\".\"%s\" WHERE key = intAsBlob(%d)", KEYSPACE1, CF_STANDARD4, i));
+            assertNotNull(result);
+            if (i < 700 || i >= 1400 && i < 1450)
+            {
+                assertEquals(ByteBufferUtil.bytes(i), result.one().getBytes("key"));
+                assertEquals("name" + i, result.one().getString("name"));
+                assertEquals("val" + i, result.one().getString("val"));
+            }
+            else
+            {
+                assertTrue("Present at " + i, result.isEmpty());
+            }
+        }
+
+        // Ensure that no stray mutations got somehow applied.
+        result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", KEYSPACE1, CF_STANDARD4));
+        assertNotNull(result);
+        assertEquals(750, result.one().getLong("count"));
+
+        // Ensure batchlog is left as expected.
+        result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.BATCHES));
+        assertNotNull(result);
+        assertEquals("Count in blog after initial replay", 750, result.one().getLong("count"));
+        result = executeInternal(String.format("SELECT count(*) FROM \"%s\".\"%s\"", SystemKeyspace.NAME, SystemKeyspace.LEGACY_BATCHLOG));
+        assertNotNull(result);
+        assertEquals("Count in blog legacy after initial replay ", 0, result.one().getLong("count"));
+    }
+
+    @Test
+    public void testAddBatch() throws IOException
+    {
+        long initialAllBatches = BatchlogManager.instance.countAllBatches();
+        CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD5).metadata;
+
+        long timestamp = (System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2) * 1000;
+        UUID uuid = UUIDGen.getTimeUUID();
+
+        // Add a batch with 10 mutations
+        List<Mutation> mutations = new ArrayList<>(10);
+        for (int j = 0; j < 10; j++)
+        {
+            mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(j))
+                          .clustering("name" + j)
+                          .add("val", "val" + j)
+                          .build());
+        }
+
+
+        BatchlogManager.store(Batch.createLocal(uuid, timestamp, mutations));
+        Assert.assertEquals(initialAllBatches + 1, BatchlogManager.instance.countAllBatches());
+
+        String query = String.format("SELECT count(*) FROM %s.%s where id = %s",
+                                     SystemKeyspace.NAME,
+                                     SystemKeyspace.BATCHES,
+                                     uuid);
+        UntypedResultSet result = executeInternal(query);
+        assertNotNull(result);
+        assertEquals(1L, result.one().getLong("count"));
+    }
+
+    @Test
+    public void testRemoveBatch()
+    {
+        long initialAllBatches = BatchlogManager.instance.countAllBatches();
+        CFMetaData cfm = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD5).metadata;
+
+        long timestamp = (System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2) * 1000;
+        UUID uuid = UUIDGen.getTimeUUID();
+
+        // Add a batch with 10 mutations
+        List<Mutation> mutations = new ArrayList<>(10);
+        for (int j = 0; j < 10; j++)
+        {
+            mutations.add(new RowUpdateBuilder(cfm, FBUtilities.timestampMicros(), ByteBufferUtil.bytes(j))
+                          .clustering("name" + j)
+                          .add("val", "val" + j)
+                          .build());
+        }
+
+        // Store the batch
+        BatchlogManager.store(Batch.createLocal(uuid, timestamp, mutations));
+        Assert.assertEquals(initialAllBatches + 1, BatchlogManager.instance.countAllBatches());
+
+        // Remove the batch
+        BatchlogManager.remove(uuid);
+
+        assertEquals(initialAllBatches, BatchlogManager.instance.countAllBatches());
+
+        String query = String.format("SELECT count(*) FROM %s.%s where id = %s",
+                                     SystemKeyspace.NAME,
+                                     SystemKeyspace.BATCHES,
+                                     uuid);
+        UntypedResultSet result = executeInternal(query);
+        assertNotNull(result);
+        assertEquals(0L, result.one().getLong("count"));
+    }
+}


Mime
View raw message