cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [4/4] cassandra git commit: Rewrite hinted handoff
Date Wed, 19 Aug 2015 14:27:15 GMT
Rewrite hinted handoff

patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for
CASSANDRA-6230


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/96d41f0e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/96d41f0e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/96d41f0e

Branch: refs/heads/cassandra-3.0
Commit: 96d41f0e0e44d9b3114a5d80dedf12053d36a76b
Parents: 811ee6c
Author: Aleksey Yeschenko <aleksey@apache.org>
Authored: Sun Nov 23 18:29:38 2014 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Wed Aug 19 17:27:22 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   9 +-
 conf/cassandra.yaml                             |  13 +
 .../org/apache/cassandra/config/Config.java     |   9 +-
 .../cassandra/config/DatabaseDescriptor.java    |  51 +-
 .../apache/cassandra/db/BatchlogManager.java    |  84 ++-
 .../cassandra/db/HintedHandOffManager.java      | 543 +------------------
 .../cassandra/db/HintedHandOffManagerMBean.java |   1 +
 src/java/org/apache/cassandra/db/Mutation.java  |  33 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |  31 +-
 .../cassandra/db/commitlog/CommitLog.java       |  20 +-
 src/java/org/apache/cassandra/gms/Gossiper.java |  14 +
 .../cassandra/hints/ChecksummedDataInput.java   | 114 ++++
 .../cassandra/hints/EncodedHintMessage.java     |  89 +++
 src/java/org/apache/cassandra/hints/Hint.java   | 130 +++++
 .../org/apache/cassandra/hints/HintMessage.java | 130 +++++
 .../apache/cassandra/hints/HintResponse.java    |  58 ++
 .../apache/cassandra/hints/HintVerbHandler.java |  89 +++
 .../org/apache/cassandra/hints/HintsBuffer.java | 261 +++++++++
 .../apache/cassandra/hints/HintsBufferPool.java | 120 ++++
 .../apache/cassandra/hints/HintsCatalog.java    | 128 +++++
 .../apache/cassandra/hints/HintsDescriptor.java | 242 +++++++++
 .../cassandra/hints/HintsDispatchExecutor.java  | 199 +++++++
 .../cassandra/hints/HintsDispatchTrigger.java   |  85 +++
 .../apache/cassandra/hints/HintsDispatcher.java | 228 ++++++++
 .../org/apache/cassandra/hints/HintsReader.java | 312 +++++++++++
 .../apache/cassandra/hints/HintsService.java    | 291 ++++++++++
 .../cassandra/hints/HintsServiceMBean.java      |  43 ++
 .../org/apache/cassandra/hints/HintsStore.java  | 210 +++++++
 .../cassandra/hints/HintsWriteExecutor.java     | 235 ++++++++
 .../org/apache/cassandra/hints/HintsWriter.java | 272 ++++++++++
 .../cassandra/hints/LegacyHintsMigrator.java    | 243 +++++++++
 .../apache/cassandra/hints/package-info.java    |  44 ++
 .../cassandra/metrics/HintedHandoffMetrics.java |   3 +-
 .../cassandra/metrics/HintsServiceMetrics.java  |  25 +
 .../apache/cassandra/net/MessagingService.java  |   9 +-
 .../cassandra/service/CassandraDaemon.java      |   4 +
 .../apache/cassandra/service/StartupChecks.java |  43 +-
 .../apache/cassandra/service/StorageProxy.java  | 209 +++----
 .../cassandra/service/StorageService.java       |  68 ++-
 .../org/apache/cassandra/tools/NodeProbe.java   |   2 +-
 .../org/apache/cassandra/utils/FBUtilities.java |  32 +-
 .../org/apache/cassandra/utils/Throwables.java  |  90 ++-
 test/conf/cassandra.yaml                        |   1 +
 .../cassandra/hints/HintsWriteThenReadTest.java | 191 +++++++
 .../OffsetAwareConfigurationLoader.java         |   1 +
 .../apache/cassandra/db/HintedHandOffTest.java  | 150 -----
 .../cassandra/db/commitlog/CommitLogTest.java   |   4 +-
 .../hints/ChecksummedDataInputTest.java         | 112 ++++
 .../apache/cassandra/hints/HintMessageTest.java |  79 +++
 .../org/apache/cassandra/hints/HintTest.java    | 231 ++++++++
 .../apache/cassandra/hints/HintsBufferTest.java | 236 ++++++++
 .../cassandra/hints/HintsCatalogTest.java       |  88 +++
 .../cassandra/hints/HintsDescriptorTest.java    | 153 ++++++
 .../apache/cassandra/hints/HintsTestUtil.java   |  60 ++
 .../hints/LegacyHintsMigratorTest.java          | 195 +++++++
 .../metrics/HintedHandOffMetricsTest.java       |  56 ++
 .../cassandra/service/StorageProxyTest.java     |   3 +-
 58 files changed, 5469 insertions(+), 908 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 54a6a07..0d17235 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-beta1
+ * Rewrite hinted handoff (CASSANDRA-6230)
  * Fix query on static compact tables (CASSANDRA-10093)
  * Fix race during construction of commit log (CASSANDRA-10049)
  * Add option to only purge repaired tombstones (CASSANDRA-6434)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 365ed31..61d3180 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -23,6 +23,9 @@ New features
      for non-primary key queries, and perform much better for indexing high
      cardinality columns.
      See http://www.datastax.com/dev/blog/new-in-cassandra-3-0-materialized-views
+   - Hinted handoff has been completely rewritten. Hints are now stored in flat
+     files, with less overhead for storage and more efficient dispatch.
+     See CASSANDRA-6230 for full details.
    - Option to not purge unrepaired tombstones. To avoid users having data resurrected
      if repair has not been run within gc_grace_seconds, an option has been added to
      only allow tombstones from repaired sstables to be purged. To enable, set the
@@ -30,10 +33,11 @@ New features
      you do not run repair for a long time, you will keep all tombstones around which
      can cause other problems.
 
-
 Upgrading
 ---------
-   - 3.0 requires Java 8u20 or later.
+   - Max mutation size is now configurable via max_mutation_size_in_kb setting in
+     cassandra.yaml; the default is half the size commitlog_segment_size_in_mb * 1024.
+   - 3.0 requires Java 8u40 or later.
    - The default JVM GC has been changed to G1GC.
    - The default JVM flag -XX:+PerfDisableSharedMem will cause the following tools JVM
      to stop working: jps, jstack, jinfo, jmc, jcmd as well as 3rd party tools like Jolokia.
@@ -91,6 +95,7 @@ Upgrading
      set/getCompactionParameters or set/getCompactionParametersJson instead.
    - SizeTieredCompactionStrategy parameter cold_reads_to_omit has been removed.
 
+
 2.2
 ===
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 67c37bc..58a343a 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -53,17 +53,30 @@ hinted_handoff_enabled: true
 # generated.  After it has been dead this long, new hints for it will not be
 # created until it has been seen alive and gone down again.
 max_hint_window_in_ms: 10800000 # 3 hours
+
 # Maximum throttle in KBs per second, per delivery thread.  This will be
 # reduced proportionally to the number of nodes in the cluster.  (If there
 # are two nodes in the cluster, each delivery thread will use the maximum
 # rate; if there are three, each will throttle to half of the maximum,
 # since we expect two nodes to be delivering hints simultaneously.)
 hinted_handoff_throttle_in_kb: 1024
+
 # Number of threads with which to deliver hints;
 # Consider increasing this number when you have multi-dc deployments, since
 # cross-dc handoff tends to be slower
 max_hints_delivery_threads: 2
 
+# Directory where Cassandra should store hints.
+# If not set, the default directory is $CASSANDRA_HOME/data/hints.
+# hints_directory: /var/lib/cassandra/hints
+
+# How often hints should be flushed from the internal buffers to disk.
+# Will *not* trigger fsync.
+hints_flush_period_in_ms: 10000
+
+# Maximum size for a single hints file, in megabytes.
+max_hints_file_size_in_mb: 128
+
 # Maximum throttle in KBs per second, total. This will be
 # reduced proportionally to the number of nodes in the cluster.
 batchlog_replay_throttle_in_kb: 1024

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index e93d090..762935d 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -56,6 +56,7 @@ public class Config
     public volatile boolean hinted_handoff_enabled = true;
     public Set<String> hinted_handoff_disabled_datacenters = Sets.newConcurrentHashSet();
     public volatile Integer max_hint_window_in_ms = 3 * 3600 * 1000; // three hours
+    public String hints_directory;
 
     public ParameterizedClass seed_provider;
     public DiskAccessMode disk_access_mode = DiskAccessMode.auto;
@@ -169,7 +170,9 @@ public class Config
     public int commitlog_segment_size_in_mb = 32;
     public ParameterizedClass commitlog_compression;
     public int commitlog_max_compression_buffers_in_pool = 3;
- 
+
+    public Integer max_mutation_size_in_kb;
+
     @Deprecated
     public int commitlog_periodic_queue_size = -1;
 
@@ -195,7 +198,9 @@ public class Config
 
     public int hinted_handoff_throttle_in_kb = 1024;
     public int batchlog_replay_throttle_in_kb = 1024;
-    public int max_hints_delivery_threads = 1;
+    public int max_hints_delivery_threads = 2;
+    public int hints_flush_period_in_ms = 10000;
+    public int max_hints_file_size_in_mb = 128;
     public int sstable_preemptive_open_interval_in_mb = 50;
 
     public volatile boolean incremental_backups = false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 9266785..b3bc4d2 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -482,6 +482,15 @@ public class DatabaseDescriptor
                 throw new ConfigurationException("commitlog_directory is missing and -Dcassandra.storagedir is not set", false);
             conf.commitlog_directory += File.separator + "commitlog";
         }
+
+        if (conf.hints_directory == null)
+        {
+            conf.hints_directory = System.getProperty("cassandra.storagedir", null);
+            if (conf.hints_directory == null)
+                throw new ConfigurationException("hints_directory is missing and -Dcassandra.storagedir is not set", false);
+            conf.hints_directory += File.separator + "hints";
+        }
+
         if (conf.saved_caches_directory == null)
         {
             conf.saved_caches_directory = System.getProperty("cassandra.storagedir", null);
@@ -502,12 +511,18 @@ public class DatabaseDescriptor
         {
             if (datadir.equals(conf.commitlog_directory))
                 throw new ConfigurationException("commitlog_directory must not be the same as any data_file_directories", false);
+            if (datadir.equals(conf.hints_directory))
+                throw new ConfigurationException("hints_directory must not be the same as any data_file_directories", false);
             if (datadir.equals(conf.saved_caches_directory))
                 throw new ConfigurationException("saved_caches_directory must not be the same as any data_file_directories", false);
         }
 
         if (conf.commitlog_directory.equals(conf.saved_caches_directory))
             throw new ConfigurationException("saved_caches_directory must not be the same as the commitlog_directory", false);
+        if (conf.commitlog_directory.equals(conf.hints_directory))
+            throw new ConfigurationException("hints_directory must not be the same as the commitlog_directory", false);
+        if (conf.hints_directory.equals(conf.saved_caches_directory))
+            throw new ConfigurationException("saved_caches_directory must not be the same as the hints_directory", false);
 
         if (conf.memtable_flush_writers == null)
             conf.memtable_flush_writers = Math.min(8, Math.max(2, Math.min(FBUtilities.getAvailableProcessors(), conf.data_file_directories.length)));
@@ -613,6 +628,11 @@ public class DatabaseDescriptor
 
         if (conf.user_defined_function_fail_timeout < conf.user_defined_function_warn_timeout)
             throw new ConfigurationException("user_defined_function_warn_timeout must less than user_defined_function_fail_timeout", false);
+
+        if (conf.max_mutation_size_in_kb == null)
+            conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2;
+        else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb)
+            throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false);
     }
 
     private static IEndpointSnitch createEndpointSnitch(String snitchClassName) throws ConfigurationException
@@ -708,18 +728,18 @@ public class DatabaseDescriptor
                 throw new ConfigurationException("At least one DataFileDirectory must be specified", false);
 
             for (String dataFileDirectory : conf.data_file_directories)
-            {
                 FileUtils.createDirectory(dataFileDirectory);
-            }
 
             if (conf.commitlog_directory == null)
                 throw new ConfigurationException("commitlog_directory must be specified", false);
-
             FileUtils.createDirectory(conf.commitlog_directory);
 
+            if (conf.hints_directory == null)
+                throw new ConfigurationException("hints_directory must be specified", false);
+            FileUtils.createDirectory(conf.hints_directory);
+
             if (conf.saved_caches_directory == null)
                 throw new ConfigurationException("saved_caches_directory must be specified", false);
-
             FileUtils.createDirectory(conf.saved_caches_directory);
         }
         catch (ConfigurationException e)
@@ -992,6 +1012,7 @@ public class DatabaseDescriptor
             case PAXOS_PREPARE:
             case PAXOS_PROPOSE:
             case BATCHLOG_MUTATION:
+            case HINT:
                 return getWriteRpcTimeout();
             case COUNTER_MUTATION:
                 return getCounterWriteRpcTimeout();
@@ -1119,6 +1140,11 @@ public class DatabaseDescriptor
         return conf.commitlog_max_compression_buffers_in_pool;
     }
 
+    public static int getMaxMutationSize()
+    {
+        return conf.max_mutation_size_in_kb * 1024;
+    }
+
     public static int getTombstoneWarnThreshold()
     {
         return conf.tombstone_warn_threshold;
@@ -1415,6 +1441,11 @@ public class DatabaseDescriptor
         return conf.max_hint_window_in_ms;
     }
 
+    public static File getHintsDirectory()
+    {
+        return new File(conf.hints_directory);
+    }
+
     public static File getSerializedCachePath(String ksName,
                                               String cfName,
                                               UUID cfId,
@@ -1484,11 +1515,21 @@ public class DatabaseDescriptor
         conf.hinted_handoff_throttle_in_kb = throttleInKB;
     }
 
-    public static int getMaxHintsThread()
+    public static int getMaxHintsDeliveryThreads()
     {
         return conf.max_hints_delivery_threads;
     }
 
+    public static int getHintsFlushPeriodInMS()
+    {
+        return conf.hints_flush_period_in_ms;
+    }
+
+    public static long getMaxHintsFileSize()
+    {
+        return conf.max_hints_file_size_in_mb * 1024 * 1024;
+    }
+
     public static boolean isIncrementalBackupsEnabled()
     {
         return conf.incremental_backups;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 8ea4318..de85925 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -41,18 +41,20 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.WriteFailureException;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.hints.Hint;
+import org.apache.cassandra.hints.HintsService;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.WriteResponseHandler;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
+import static com.google.common.collect.Iterables.transform;
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternalWithPaging;
 
@@ -210,6 +212,9 @@ public class BatchlogManager implements BatchlogManagerMBean
         int positionInPage = 0;
         ArrayList<Batch> unfinishedBatches = new ArrayList<>(pageSize);
 
+        Set<InetAddress> hintedNodes = new HashSet<>();
+        Set<UUID> replayedBatches = new HashSet<>();
+
         // Sending out batches for replay without waiting for them, so that one stuck batch doesn't affect others
         for (UntypedResultSet.Row row : batches)
         {
@@ -218,7 +223,7 @@ public class BatchlogManager implements BatchlogManagerMBean
             Batch batch = new Batch(id, row.getBytes("data"), version);
             try
             {
-                if (batch.replay(rateLimiter) > 0)
+                if (batch.replay(rateLimiter, hintedNodes) > 0)
                 {
                     unfinishedBatches.add(batch);
                 }
@@ -238,21 +243,29 @@ public class BatchlogManager implements BatchlogManagerMBean
             {
                 // We have reached the end of a batch. To avoid keeping more than a page of mutations in memory,
                 // finish processing the page before requesting the next row.
-                finishAndClearBatches(unfinishedBatches);
+                finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
                 positionInPage = 0;
             }
         }
-        finishAndClearBatches(unfinishedBatches);
+
+        finishAndClearBatches(unfinishedBatches, hintedNodes, replayedBatches);
+
+        // to preserve batch guarantees, we must ensure that hints (if any) have made it to disk, before deleting the batches
+        HintsService.instance.flushAndFsyncBlockingly(transform(hintedNodes, StorageService.instance::getHostIdForEndpoint));
+
+        // once all generated hints are fsynced, actually delete the batches
+        replayedBatches.forEach(BatchlogManager::deleteBatch);
     }
 
-    private void finishAndClearBatches(ArrayList<Batch> batches)
+    private void finishAndClearBatches(ArrayList<Batch> batches, Set<InetAddress> hintedNodes, Set<UUID> replayedBatches)
     {
         // schedule hints for timed out deliveries
         for (Batch batch : batches)
         {
-            batch.finish();
-            deleteBatch(batch.id);
+            batch.finish(hintedNodes);
+            replayedBatches.add(batch.id);
         }
+
         totalBatchesReplayed += batches.size();
         batches.clear();
     }
@@ -279,7 +292,7 @@ public class BatchlogManager implements BatchlogManagerMBean
             this.version = version;
         }
 
-        public int replay(RateLimiter rateLimiter) throws IOException
+        public int replay(RateLimiter rateLimiter, Set<InetAddress> hintedNodes) throws IOException
         {
             logger.debug("Replaying batch {}", id);
 
@@ -288,18 +301,18 @@ public class BatchlogManager implements BatchlogManagerMBean
             if (mutations.isEmpty())
                 return 0;
 
-            int ttl = calculateHintTTL(mutations);
-            if (ttl <= 0)
+            int gcgs = gcgs(mutations);
+            if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs <= FBUtilities.nowInSeconds())
                 return 0;
 
-            replayHandlers = sendReplays(mutations, writtenAt, ttl);
+            replayHandlers = sendReplays(mutations, writtenAt, hintedNodes);
 
             rateLimiter.acquire(data.remaining()); // acquire afterwards, to not mess up ttl calculation.
 
             return replayHandlers.size();
         }
 
-        public void finish()
+        public void finish(Set<InetAddress> hintedNodes)
         {
             for (int i = 0; i < replayHandlers.size(); i++)
             {
@@ -313,7 +326,7 @@ public class BatchlogManager implements BatchlogManagerMBean
                     logger.debug("Failed replaying a batched mutation to a node, will write a hint");
                     logger.debug("Failure was : {}", e.getMessage());
                     // writing hints for the rest to hints, starting from i
-                    writeHintsForUndeliveredEndpoints(i);
+                    writeHintsForUndeliveredEndpoints(i, hintedNodes);
                     return;
                 }
             }
@@ -341,7 +354,7 @@ public class BatchlogManager implements BatchlogManagerMBean
             return mutations;
         }
 
-        private void writeHintsForUndeliveredEndpoints(int startFrom)
+        private void writeHintsForUndeliveredEndpoints(int startFrom, Set<InetAddress> hintedNodes)
         {
             try
             {
@@ -353,12 +366,15 @@ public class BatchlogManager implements BatchlogManagerMBean
                 for (int i = startFrom; i < replayHandlers.size(); i++)
                 {
                     Mutation undeliveredMutation = replayingMutations.get(i);
-                    int ttl = calculateHintTTL(replayingMutations);
+                    int gcgs = gcgs(replayingMutations);
                     ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
 
-                    if (ttl > 0 && handler != null)
-                        for (InetAddress endpoint : handler.undelivered)
-                            StorageProxy.writeHintForMutation(undeliveredMutation, writtenAt, ttl, endpoint);
+                    if (TimeUnit.MILLISECONDS.toSeconds(writtenAt) + gcgs > FBUtilities.nowInSeconds() && handler != null)
+                    {
+                        hintedNodes.addAll(handler.undelivered);
+                        HintsService.instance.write(transform(handler.undelivered, StorageService.instance::getHostIdForEndpoint),
+                                                    Hint.create(undeliveredMutation, writtenAt));
+                    }
                 }
             }
             catch (IOException e)
@@ -367,12 +383,14 @@ public class BatchlogManager implements BatchlogManagerMBean
             }
         }
 
-        private static List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations, long writtenAt, int ttl)
+        private static List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations,
+                                                                              long writtenAt,
+                                                                              Set<InetAddress> hintedNodes)
         {
             List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size());
             for (Mutation mutation : mutations)
             {
-                ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, ttl);
+                ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, hintedNodes);
                 if (handler != null)
                     handlers.add(handler);
             }
@@ -385,7 +403,9 @@ public class BatchlogManager implements BatchlogManagerMBean
          *
          * @return direct delivery handler to wait on or null, if no live nodes found
          */
-        private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation, long writtenAt, int ttl)
+        private static ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation,
+                                                                                     long writtenAt,
+                                                                                     Set<InetAddress> hintedNodes)
         {
             Set<InetAddress> liveEndpoints = new HashSet<>();
             String ks = mutation.getKeyspaceName();
@@ -395,11 +415,19 @@ public class BatchlogManager implements BatchlogManagerMBean
                                                          StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))
             {
                 if (endpoint.equals(FBUtilities.getBroadcastAddress()))
+                {
                     mutation.apply();
+                }
                 else if (FailureDetector.instance.isAlive(endpoint))
+                {
                     liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint.
+                }
                 else
-                    StorageProxy.writeHintForMutation(mutation, writtenAt, ttl, endpoint);
+                {
+                    hintedNodes.add(endpoint);
+                    HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(endpoint),
+                                                Hint.create(mutation, writtenAt));
+                }
             }
 
             if (liveEndpoints.isEmpty())
@@ -412,16 +440,12 @@ public class BatchlogManager implements BatchlogManagerMBean
             return handler;
         }
 
-        /*
-         * Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog).
-         * This ensures that deletes aren't "undone" by an old batch replay.
-         */
-        private int calculateHintTTL(Collection<Mutation> mutations)
+        private static int gcgs(Collection<Mutation> mutations)
         {
-            int unadjustedTTL = Integer.MAX_VALUE;
+            int gcgs = Integer.MAX_VALUE;
             for (Mutation mutation : mutations)
-                unadjustedTTL = Math.min(unadjustedTTL, HintedHandOffManager.calculateHintTTL(mutation));
-            return unadjustedTTL - (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - writtenAt);
+                gcgs = Math.min(gcgs, mutation.smallestGCGS());
+            return gcgs;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 8bea2e8..3279acf 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -17,156 +17,32 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.List;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.RateLimiter;
-import com.google.common.util.concurrent.Uninterruptibles;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.compaction.CompactionManager;
-
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.Int32Type;
-import org.apache.cassandra.db.marshal.UUIDType;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.gms.ApplicationState;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.gms.Gossiper;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTable;
-import org.apache.cassandra.io.util.DataInputBuffer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.metrics.HintedHandoffMetrics;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.WriteResponseHandler;
-import org.apache.cassandra.utils.*;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
+import org.apache.cassandra.hints.HintsService;
 
 /**
- * The hint schema looks like this:
- *
- * CREATE TABLE hints (
- *   target_id uuid,
- *   hint_id timeuuid,
- *   message_version int,
- *   mutation blob,
- *   PRIMARY KEY (target_id, hint_id, message_version)
- * ) WITH COMPACT STORAGE;
- *
- * Thus, for each node in the cluster we treat its uuid as the partition key; each hint is a logical row
- * (physical composite column) containing the mutation to replay and associated metadata.
+ * A proxy class that implement the deprecated legacy HintedHandoffManagerMBean interface.
  *
- * When FailureDetector signals that a node that was down is back up, we page through
- * the hinted mutations and send them over one at a time, waiting for
- * hinted_handoff_throttle_delay in between each.
- *
- * deliverHints is also exposed to JMX so it can be run manually if FD ever misses
- * its cue somehow.
+ * TODO: remove in 4.0.
  */
-
-public class HintedHandOffManager implements HintedHandOffManagerMBean
+@SuppressWarnings("deprecation")
+@Deprecated
+public final class HintedHandOffManager implements HintedHandOffManagerMBean
 {
-    public static final String MBEAN_NAME = "org.apache.cassandra.db:type=HintedHandoffManager";
     public static final HintedHandOffManager instance = new HintedHandOffManager();
 
-    private static final Logger logger = LoggerFactory.getLogger(HintedHandOffManager.class);
-
-    private static final int MAX_SIMULTANEOUSLY_REPLAYED_HINTS = 128;
-    private static final int LARGE_NUMBER = 65536; // 64k nodes ought to be enough for anybody.
-
-    public final HintedHandoffMetrics metrics = new HintedHandoffMetrics();
-
-    private volatile boolean hintedHandOffPaused = false;
-
-    static final int maxHintTTL = Integer.parseInt(System.getProperty("cassandra.maxHintTTL", String.valueOf(Integer.MAX_VALUE)));
-
-    private final NonBlockingHashSet<InetAddress> queuedDeliveries = new NonBlockingHashSet<>();
-
-    // To keep metrics consistent with earlier versions, where periodic tasks were run on a shared executor,
-    // we run them on this executor and so keep counts separate from those for hint delivery tasks. See CASSANDRA-9129
-    private final DebuggableScheduledThreadPoolExecutor executor =
-        new DebuggableScheduledThreadPoolExecutor(1, new NamedThreadFactory("HintedHandoffManager", Thread.MIN_PRIORITY));
-
-    // Non-scheduled executor to run the actual hint delivery tasks.
-    // Per CASSANDRA-9129, this is where the values displayed in nodetool tpstats
-    // and via the HintedHandoff mbean are obtained.
-    private final ThreadPoolExecutor hintDeliveryExecutor =
-        new JMXEnabledThreadPoolExecutor(
-            DatabaseDescriptor.getMaxHintsThread(),
-            Integer.MAX_VALUE,
-            TimeUnit.SECONDS,
-            new LinkedBlockingQueue<Runnable>(),
-            new NamedThreadFactory("HintedHandoff", Thread.MIN_PRIORITY),
-            "internal");
-
-    private final ColumnFamilyStore hintStore = Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS);
-
-    private static final ColumnDefinition hintColumn = SystemKeyspace.Hints.compactValueColumn();
-
-    /**
-     * Returns a mutation representing a Hint to be sent to <code>targetId</code>
-     * as soon as it becomes available again.
-     */
-    public Mutation hintFor(Mutation mutation, long now, int ttl, UUID targetId)
-    {
-        assert ttl > 0;
-
-        InetAddress endpoint = StorageService.instance.getTokenMetadata().getEndpointForHostId(targetId);
-        // during tests we may not have a matching endpoint, but this would be unexpected in real clusters
-        if (endpoint != null)
-            metrics.incrCreatedHints(endpoint);
-        else
-            logger.warn("Unable to find matching endpoint for target {} when storing a hint", targetId);
-
-        UUID hintId = UUIDGen.getTimeUUID();
-        // serialize the hint with id and version as a composite column name
-
-        ByteBuffer key = UUIDType.instance.decompose(targetId);
-        Clustering clustering = SystemKeyspace.Hints.comparator.make(hintId, MessagingService.current_version);
-        ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, MessagingService.current_version));
-        Cell cell = BufferCell.expiring(hintColumn, now, ttl, FBUtilities.nowInSeconds(), value);
-
-        return new Mutation(PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, key, BTreeRow.singleCellRow(clustering, cell)));
-    }
+    public static final String MBEAN_NAME = "org.apache.cassandra.db:type=HintedHandoffManager";
 
-    /*
-     * determine the TTL for the hint Mutation
-     * this is set at the smallest GCGraceSeconds for any of the CFs in the RM
-     * this ensures that deletes aren't "undone" by delivery of an old hint
-     */
-    public static int calculateHintTTL(Mutation mutation)
+    private HintedHandOffManager()
     {
-        int ttl = maxHintTTL;
-        for (PartitionUpdate upd : mutation.getPartitionUpdates())
-            ttl = Math.min(ttl, upd.metadata().params.gcGraceSeconds);
-        return ttl;
     }
 
-    public void start()
+    public void registerMBean()
     {
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
@@ -177,404 +53,35 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         {
             throw new RuntimeException(e);
         }
-        logger.debug("Created HHOM instance, registered MBean.");
-
-        Runnable runnable = new Runnable()
-        {
-            public void run()
-            {
-                scheduleAllDeliveries();
-                metrics.log();
-            }
-        };
-        executor.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
-    }
-
-    private static void deleteHint(ByteBuffer tokenBytes, Clustering clustering, long timestamp)
-    {
-        Cell cell = BufferCell.tombstone(hintColumn, timestamp, FBUtilities.nowInSeconds());
-        PartitionUpdate upd = PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, tokenBytes, BTreeRow.singleCellRow(clustering, cell));
-        new Mutation(upd).applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery
-    }
-
-    public void deleteHintsForEndpoint(final String ipOrHostname)
-    {
-        try
-        {
-            InetAddress endpoint = InetAddress.getByName(ipOrHostname);
-            deleteHintsForEndpoint(endpoint);
-        }
-        catch (UnknownHostException e)
-        {
-            logger.warn("Unable to find {}, not a hostname or ipaddr of a node", ipOrHostname);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void deleteHintsForEndpoint(final InetAddress endpoint)
-    {
-        if (!StorageService.instance.getTokenMetadata().isMember(endpoint))
-            return;
-        UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint);
-        ByteBuffer key = ByteBuffer.wrap(UUIDGen.decompose(hostId));
-        final Mutation mutation = new Mutation(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Hints, key, System.currentTimeMillis(), FBUtilities.nowInSeconds()));
-
-        // execute asynchronously to avoid blocking caller (which may be processing gossip)
-        Runnable runnable = new Runnable()
-        {
-            public void run()
-            {
-                try
-                {
-                    logger.info("Deleting any stored hints for {}", endpoint);
-                    mutation.apply();
-                    hintStore.forceBlockingFlush();
-                    compact();
-                }
-                catch (Exception e)
-                {
-                    JVMStabilityInspector.inspectThrowable(e);
-                    logger.warn("Could not delete hints for {}: {}", endpoint, e);
-                }
-            }
-        };
-        executor.submit(runnable);
-    }
-
-    //foobar
-    public void truncateAllHints() throws ExecutionException, InterruptedException
-    {
-        Runnable runnable = new Runnable()
-        {
-            public void run()
-            {
-                try
-                {
-                    logger.info("Truncating all stored hints.");
-                    Keyspace.open(SystemKeyspace.NAME).getColumnFamilyStore(SystemKeyspace.HINTS).truncateBlocking();
-                }
-                catch (Exception e)
-                {
-                    logger.warn("Could not truncate all hints.", e);
-                }
-            }
-        };
-        executor.submit(runnable).get();
-    }
-
-    @VisibleForTesting
-    protected synchronized void compact()
-    {
-        ArrayList<Descriptor> descriptors = new ArrayList<>();
-        for (SSTable sstable : hintStore.getTracker().getUncompacting())
-            descriptors.add(sstable.descriptor);
-
-        if (descriptors.isEmpty())
-            return;
-
-        try
-        {
-            CompactionManager.instance.submitUserDefined(hintStore, descriptors, (int) (System.currentTimeMillis() / 1000)).get();
-        }
-        catch (InterruptedException | ExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private int waitForSchemaAgreement(InetAddress endpoint) throws TimeoutException
-    {
-        Gossiper gossiper = Gossiper.instance;
-        int waited = 0;
-        // first, wait for schema to be gossiped.
-        while (gossiper.getEndpointStateForEndpoint(endpoint) != null && gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA) == null)
-        {
-            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-            waited += 1000;
-            if (waited > 2 * StorageService.RING_DELAY)
-                throw new TimeoutException("Didin't receive gossiped schema from " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
-        }
-        if (gossiper.getEndpointStateForEndpoint(endpoint) == null)
-            throw new TimeoutException("Node " + endpoint + " vanished while waiting for agreement");
-        waited = 0;
-        // then wait for the correct schema version.
-        // usually we use DD.getDefsVersion, which checks the local schema uuid as stored in the system keyspace.
-        // here we check the one in gossip instead; this serves as a canary to warn us if we introduce a bug that
-        // causes the two to diverge (see CASSANDRA-2946)
-        while (gossiper.getEndpointStateForEndpoint(endpoint) != null && !gossiper.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.SCHEMA).value.equals(
-                gossiper.getEndpointStateForEndpoint(FBUtilities.getBroadcastAddress()).getApplicationState(ApplicationState.SCHEMA).value))
-        {
-            Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-            waited += 1000;
-            if (waited > 2 * StorageService.RING_DELAY)
-                throw new TimeoutException("Could not reach schema agreement with " + endpoint + " in " + 2 * StorageService.RING_DELAY + "ms");
-        }
-        if (gossiper.getEndpointStateForEndpoint(endpoint) == null)
-            throw new TimeoutException("Node " + endpoint + " vanished while waiting for agreement");
-        logger.debug("schema for {} matches local schema", endpoint);
-        return waited;
     }
 
-    private void deliverHintsToEndpoint(InetAddress endpoint)
+    public void deleteHintsForEndpoint(String host)
     {
-        if (hintStore.isEmpty())
-            return; // nothing to do, don't confuse users by logging a no-op handoff
-
-        // check if hints delivery has been paused
-        if (hintedHandOffPaused)
-        {
-            logger.debug("Hints delivery process is paused, aborting");
-            return;
-        }
-
-        logger.debug("Checking remote({}) schema before delivering hints", endpoint);
-        try
-        {
-            waitForSchemaAgreement(endpoint);
-        }
-        catch (TimeoutException e)
-        {
-            return;
-        }
-
-        if (!FailureDetector.instance.isAlive(endpoint))
-        {
-            logger.debug("Endpoint {} died before hint delivery, aborting", endpoint);
-            return;
-        }
-
-        doDeliverHintsToEndpoint(endpoint);
-
-        // Flush all the tombstones to disk
-        hintStore.forceBlockingFlush();
+        HintsService.instance.deleteAllHintsForEndpoint(host);
     }
 
-    private boolean checkDelivered(InetAddress endpoint, List<WriteResponseHandler<Mutation>> handlers, AtomicInteger rowsReplayed)
+    public void truncateAllHints()
     {
-        for (WriteResponseHandler<Mutation> handler : handlers)
-        {
-            try
-            {
-                handler.get();
-            }
-            catch (WriteTimeoutException e)
-            {
-                logger.info("Failed replaying hints to {}; aborting ({} delivered), error : {}",
-                            endpoint, rowsReplayed, e.getMessage());
-                return false;
-            }
-        }
-        return true;
+        HintsService.instance.deleteAllHints();
     }
 
-    /*
-     * 1. Get the key of the endpoint we need to handoff
-     * 2. For each column, deserialize the mutation and send it to the endpoint
-     * 3. Delete the column if the write was successful
-     * 4. Force a flush
-     */
-    private void doDeliverHintsToEndpoint(InetAddress endpoint)
-    {
-        // find the hints for the node using its token.
-        UUID hostId = Gossiper.instance.getHostId(endpoint);
-        logger.info("Started hinted handoff for host: {} with IP: {}", hostId, endpoint);
-        final ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId));
-
-        final AtomicInteger rowsReplayed = new AtomicInteger(0);
-
-        // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0 in cassandra.yaml).
-        // max rate is scaled by the number of nodes in the cluster (CASSANDRA-5272).
-        int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB()
-                           / (StorageService.instance.getTokenMetadata().getAllEndpoints().size() - 1);
-        RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE : throttleInKB * 1024);
-
-        int nowInSec = FBUtilities.nowInSeconds();
-        try (OpOrder.Group op = hintStore.readOrdering.start();
-             RowIterator iter = UnfilteredRowIterators.filter(SinglePartitionReadCommand.fullPartitionRead(SystemKeyspace.Hints, nowInSec, hostIdBytes).queryMemtableAndDisk(hintStore, op), nowInSec))
-        {
-            List<WriteResponseHandler<Mutation>> responseHandlers = Lists.newArrayList();
-
-            while (iter.hasNext())
-            {
-                // check if node is still alive and we should continue delivery process
-                if (!FailureDetector.instance.isAlive(endpoint))
-                {
-                    logger.info("Endpoint {} died during hint delivery; aborting ({} delivered)", endpoint, rowsReplayed);
-                    return;
-                }
-
-                // check if hints delivery has been paused during the process
-                if (hintedHandOffPaused)
-                {
-                    logger.debug("Hints delivery process is paused, aborting");
-                    return;
-                }
-
-                // Wait regularly on the endpoint acknowledgment. If we timeout on it, the endpoint is probably dead so stop delivery
-                if (responseHandlers.size() > MAX_SIMULTANEOUSLY_REPLAYED_HINTS && !checkDelivered(endpoint, responseHandlers, rowsReplayed))
-                    return;
-
-                final Row hint = iter.next();
-                int version = Int32Type.instance.compose(hint.clustering().get(1));
-                Cell cell = hint.getCell(hintColumn);
-
-                final long timestamp = cell.timestamp();
-                DataInputPlus in = new DataInputBuffer(cell.value(), true);
-                Mutation mutation;
-                try
-                {
-                    mutation = Mutation.serializer.deserialize(in, version);
-                }
-                catch (UnknownColumnFamilyException e)
-                {
-                    logger.debug("Skipping delivery of hint for deleted table", e);
-                    deleteHint(hostIdBytes, hint.clustering(), timestamp);
-                    continue;
-                }
-                catch (IOException e)
-                {
-                    throw new AssertionError(e);
-                }
-
-                for (UUID cfId : mutation.getColumnFamilyIds())
-                {
-                    if (timestamp <= SystemKeyspace.getTruncatedAt(cfId))
-                    {
-                        logger.debug("Skipping delivery of hint for truncated table {}", cfId);
-                        mutation = mutation.without(cfId);
-                    }
-                }
-
-                if (mutation.isEmpty())
-                {
-                    deleteHint(hostIdBytes, hint.clustering(), timestamp);
-                    continue;
-                }
-
-                MessageOut<Mutation> message = mutation.createMessage();
-                rateLimiter.acquire(message.serializedSize(MessagingService.current_version));
-                Runnable callback = new Runnable()
-                {
-                    public void run()
-                    {
-                        rowsReplayed.incrementAndGet();
-                        deleteHint(hostIdBytes, hint.clustering(), timestamp);
-                    }
-                };
-                WriteResponseHandler<Mutation> responseHandler = new WriteResponseHandler<>(endpoint, WriteType.SIMPLE, callback);
-                MessagingService.instance().sendRR(message, endpoint, responseHandler, false);
-                responseHandlers.add(responseHandler);
-            }
-
-            // Wait on the last handlers
-            if (checkDelivered(endpoint, responseHandlers, rowsReplayed))
-                logger.info("Finished hinted handoff of {} rows to endpoint {}", rowsReplayed, endpoint);
-        }
-    }
-
-    /**
-     * Attempt delivery to any node for which we have hints.  Necessary since we can generate hints even for
-     * nodes which are never officially down/failed.
-     */
-    private void scheduleAllDeliveries()
-    {
-        logger.debug("Started scheduleAllDeliveries");
-
-        // Force a major compaction to get rid of the tombstones and expired hints. Do it once, before we schedule any
-        // individual replay, to avoid N - 1 redundant individual compactions (when N is the number of nodes with hints
-        // to deliver to).
-        compact();
-
-        ReadCommand cmd = new PartitionRangeReadCommand(hintStore.metadata,
-                                                        FBUtilities.nowInSeconds(),
-                                                        ColumnFilter.all(hintStore.metadata),
-                                                        RowFilter.NONE,
-                                                        DataLimits.cqlLimits(Integer.MAX_VALUE, 1),
-                                                        DataRange.allData(hintStore.metadata.partitioner));
-
-        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup))
-        {
-            while (iter.hasNext())
-            {
-                try (UnfilteredRowIterator partition = iter.next())
-                {
-                    UUID hostId = UUIDGen.getUUID(partition.partitionKey().getKey());
-                    InetAddress target = StorageService.instance.getTokenMetadata().getEndpointForHostId(hostId);
-                    // token may have since been removed (in which case we have just read back a tombstone)
-                    if (target != null)
-                        scheduleHintDelivery(target, false);
-                }
-            }
-        }
-
-        logger.debug("Finished scheduleAllDeliveries");
-    }
-
-    /*
-     * This method is used to deliver hints to a particular endpoint.
-     * When we learn that some endpoint is back up we deliver the data
-     * to him via an event driven mechanism.
-    */
-    public void scheduleHintDelivery(final InetAddress to, final boolean precompact)
-    {
-        // We should not deliver hints to the same host in 2 different threads
-        if (!queuedDeliveries.add(to))
-            return;
-
-        logger.debug("Scheduling delivery of Hints to {}", to);
-
-        hintDeliveryExecutor.execute(new Runnable()
-        {
-            public void run()
-            {
-                try
-                {
-                    // If it's an individual node hint replay (triggered by Gossip or via JMX), and not the global scheduled replay
-                    // (every 10 minutes), force a major compaction to get rid of the tombstones and expired hints.
-                    if (precompact)
-                        compact();
-
-                    deliverHintsToEndpoint(to);
-                }
-                finally
-                {
-                    queuedDeliveries.remove(to);
-                }
-            }
-        });
-    }
-
-    public void scheduleHintDelivery(String to) throws UnknownHostException
+    // TODO
+    public List<String> listEndpointsPendingHints()
     {
-        scheduleHintDelivery(InetAddress.getByName(to), true);
+        throw new UnsupportedOperationException();
     }
 
-    public void pauseHintsDelivery(boolean b)
+    // TODO
+    public void scheduleHintDelivery(String host)
     {
-        hintedHandOffPaused = b;
+        throw new UnsupportedOperationException();
     }
 
-    public List<String> listEndpointsPendingHints()
+    public void pauseHintsDelivery(boolean doPause)
     {
-        // Extract the keys as strings to be reported.
-        List<String> result = new ArrayList<>();
-
-        ReadCommand cmd = PartitionRangeReadCommand.allDataRead(SystemKeyspace.Hints, FBUtilities.nowInSeconds());
-        try (ReadOrderGroup orderGroup = cmd.startOrderGroup();
-             UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup))
-        {
-            while (iter.hasNext())
-            {
-                try (UnfilteredRowIterator partition = iter.next())
-                {
-                    // We don't delete by range on the hints table, so we don't have to worry about the
-                    // iterator returning only range tombstone marker
-                    if (partition.hasNext())
-                        result.add(UUIDType.instance.compose(partition.partitionKey().getKey()).toString());
-                }
-            }
-        }
-
-        return result;
+        if (doPause)
+            HintsService.instance.pauseDispatch();
+        else
+            HintsService.instance.resumeDispatch();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java b/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
index bbb2a14..9ba425e 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
@@ -21,6 +21,7 @@ import java.net.UnknownHostException;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 
+@Deprecated
 public interface HintedHandOffManagerMBean
 {
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 709c78f..6e78b0e 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -75,10 +75,24 @@ public class Mutation implements IMutation
 
     public Mutation copy()
     {
-        Mutation copy = new Mutation(keyspaceName, key, new HashMap<>(modifications));
+        return new Mutation(keyspaceName, key, new HashMap<>(modifications));
+    }
+
+    public Mutation without(Set<UUID> cfIds)
+    {
+        if (cfIds.isEmpty())
+            return this;
+
+        Mutation copy = copy();
+        copy.modifications.keySet().removeAll(cfIds);
         return copy;
     }
 
+    public Mutation without(UUID cfId)
+    {
+        return without(Collections.singleton(cfId));
+    }
+
     public String getKeyspaceName()
     {
         return keyspaceName;
@@ -207,6 +221,14 @@ public class Mutation implements IMutation
         return DatabaseDescriptor.getWriteRpcTimeout();
     }
 
+    public int smallestGCGS()
+    {
+        int gcgs = Integer.MAX_VALUE;
+        for (PartitionUpdate update : getPartitionUpdates())
+            gcgs = Math.min(gcgs, update.metadata().params.gcGraceSeconds);
+        return gcgs;
+    }
+
     public String toString()
     {
         return toString(false);
@@ -235,15 +257,6 @@ public class Mutation implements IMutation
         return buff.append("])").toString();
     }
 
-    public Mutation without(UUID cfId)
-    {
-        Mutation mutation = new Mutation(keyspaceName, key);
-        for (Map.Entry<UUID, PartitionUpdate> entry : modifications.entrySet())
-            if (!entry.getKey().equals(cfId))
-                mutation.add(entry.getValue());
-        return mutation;
-    }
-
     public static class MutationSerializer implements IVersionedSerializer<Mutation>
     {
         public void serialize(Mutation mutation, DataOutputPlus out, int version) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index d24b18b..38cfed6 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -88,7 +88,6 @@ public final class SystemKeyspace
 
     public static final String NAME = "system";
 
-    public static final String HINTS = "hints";
     public static final String BATCHES = "batches";
     public static final String PAXOS = "paxos";
     public static final String BUILT_INDEXES = "IndexInfo";
@@ -103,6 +102,7 @@ public final class SystemKeyspace
     public static final String MATERIALIZED_VIEWS_BUILDS_IN_PROGRESS = "materialized_views_builds_in_progress";
     public static final String BUILT_MATERIALIZED_VIEWS = "built_materialized_views";
 
+    @Deprecated public static final String LEGACY_HINTS = "hints";
     @Deprecated public static final String LEGACY_BATCHLOG = "batchlog";
     @Deprecated public static final String LEGACY_KEYSPACES = "schema_keyspaces";
     @Deprecated public static final String LEGACY_COLUMNFAMILIES = "schema_columnfamilies";
@@ -112,19 +112,6 @@ public final class SystemKeyspace
     @Deprecated public static final String LEGACY_FUNCTIONS = "schema_functions";
     @Deprecated public static final String LEGACY_AGGREGATES = "schema_aggregates";
 
-    public static final CFMetaData Hints =
-        compile(HINTS,
-                "hints awaiting delivery",
-                "CREATE TABLE %s ("
-                + "target_id uuid,"
-                + "hint_id timeuuid,"
-                + "message_version int,"
-                + "mutation blob,"
-                + "PRIMARY KEY ((target_id), hint_id, message_version)) "
-                + "WITH COMPACT STORAGE")
-                .compaction(CompactionParams.scts(singletonMap("enabled", "false")))
-                .gcGraceSeconds(0);
-
     public static final CFMetaData Batches =
         compile(BATCHES,
                 "batches awaiting replay",
@@ -282,6 +269,20 @@ public final class SystemKeyspace
                 + "PRIMARY KEY ((keyspace_name), view_name))");
 
     @Deprecated
+    public static final CFMetaData LegacyHints =
+        compile(LEGACY_HINTS,
+                "*DEPRECATED* hints awaiting delivery",
+                "CREATE TABLE %s ("
+                + "target_id uuid,"
+                + "hint_id timeuuid,"
+                + "message_version int,"
+                + "mutation blob,"
+                + "PRIMARY KEY ((target_id), hint_id, message_version)) "
+                + "WITH COMPACT STORAGE")
+                .compaction(CompactionParams.scts(singletonMap("enabled", "false")))
+                .gcGraceSeconds(0);
+
+    @Deprecated
     public static final CFMetaData LegacyBatchlog =
         compile(LEGACY_BATCHLOG,
                 "*DEPRECATED* batchlog entries",
@@ -423,7 +424,6 @@ public final class SystemKeyspace
     private static Tables tables()
     {
         return Tables.of(BuiltIndexes,
-                         Hints,
                          Batches,
                          Paxos,
                          Local,
@@ -436,6 +436,7 @@ public final class SystemKeyspace
                          AvailableRanges,
                          MaterializedViewsBuildsInProgress,
                          BuiltMaterializedViews,
+                         LegacyHints,
                          LegacyBatchlog,
                          LegacyKeyspaces,
                          LegacyColumnfamilies,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index ff27225..37fcbe2 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -48,6 +48,8 @@ import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 import static org.apache.cassandra.db.commitlog.CommitLogSegment.*;
+import static org.apache.cassandra.utils.FBUtilities.updateChecksum;
+import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
 
 /*
  * Commit Log tracks every write operation into the system. The aim of the commit log is to be able to
@@ -61,7 +63,7 @@ public class CommitLog implements CommitLogMBean
 
     // we only permit records HALF the size of a commit log, to ensure we don't spin allocating many mostly
     // empty segments when writing large records
-    private final long MAX_MUTATION_SIZE = DatabaseDescriptor.getCommitLogSegmentSize() >> 1;
+    private final long MAX_MUTATION_SIZE = DatabaseDescriptor.getMaxMutationSize();
 
     public final CommitLogSegmentManager allocator;
     public final CommitLogArchiver archiver;
@@ -254,9 +256,9 @@ public class CommitLog implements CommitLogMBean
     {
         assert mutation != null;
 
-        long size = Mutation.serializer.serializedSize(mutation, MessagingService.current_version);
+        int size = (int) Mutation.serializer.serializedSize(mutation, MessagingService.current_version);
 
-        long totalSize = size + ENTRY_OVERHEAD_SIZE;
+        int totalSize = size + ENTRY_OVERHEAD_SIZE;
         if (totalSize > MAX_MUTATION_SIZE)
         {
             throw new IllegalArgumentException(String.format("Mutation of %s bytes is too large for the maxiumum size of %s",
@@ -269,19 +271,13 @@ public class CommitLog implements CommitLogMBean
         try (BufferedDataOutputStreamPlus dos = new DataOutputBufferFixed(buffer))
         {
             // checksummed length
-            dos.writeInt((int) size);
-
-            ByteBuffer copy = buffer.duplicate();
-            copy.position(buffer.position() - 4);
-            copy.limit(buffer.position());
-            checksum.update(copy);
+            dos.writeInt(size);
+            updateChecksumInt(checksum, size);
             buffer.putInt((int) checksum.getValue());
 
             // checksummed mutation
-            copy = buffer.duplicate();
             Mutation.serializer.serialize(mutation, dos, MessagingService.current_version);
-            copy.limit(copy.position() + (int) size);
-            checksum.update(copy);
+            updateChecksum(checksum, buffer, buffer.position() - size, size);
             buffer.putInt((int) checksum.getValue());
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 5fa402a..7aa604e 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -809,6 +809,20 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return endpointStateMap.get(ep);
     }
 
+    public boolean valuesEqual(InetAddress ep1, InetAddress ep2, ApplicationState as)
+    {
+        EndpointState state1 = getEndpointStateForEndpoint(ep1);
+        EndpointState state2 = getEndpointStateForEndpoint(ep2);
+
+        if (state1 == null || state2 == null)
+            return false;
+
+        VersionedValue value1 = state1.getApplicationState(as);
+        VersionedValue value2 = state2.getApplicationState(as);
+
+        return !(value1 == null || value2 == null) && value1.value.equals(value2.value);
+    }
+
     // removes ALL endpoint states; should only be called after shadow gossip
     public void resetEndpointStateMap()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
new file mode 100644
index 0000000..fa727bc
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.IOException;
+import java.util.zip.CRC32;
+
+import org.apache.cassandra.io.util.AbstractDataInput;
+
+/**
+ * An {@link AbstractDataInput} wrapper that calctulates the CRC in place.
+ *
+ * Useful for {@link org.apache.cassandra.hints.HintsReader}, for example, where we must verify the CRC, yet don't want
+ * to allocate an extra byte array just that purpose.
+ *
+ * In addition to calculating the CRC, allows to enforce a maximim known size. This is needed
+ * so that {@link org.apache.cassandra.db.Mutation.MutationSerializer} doesn't blow up the heap when deserializing a
+ * corrupted sequence by reading a huge corrupted length of bytes via
+ * via {@link org.apache.cassandra.utils.ByteBufferUtil#readWithLength(java.io.DataInput)}.
+ */
+public final class ChecksummedDataInput extends AbstractDataInput
+{
+    private final CRC32 crc;
+    private final AbstractDataInput source;
+    private int limit;
+
+    private ChecksummedDataInput(AbstractDataInput source)
+    {
+        this.source = source;
+
+        crc = new CRC32();
+        limit = Integer.MAX_VALUE;
+    }
+
+    public static ChecksummedDataInput wrap(AbstractDataInput source)
+    {
+        return new ChecksummedDataInput(source);
+    }
+
+    public void resetCrc()
+    {
+        crc.reset();
+    }
+
+    public void resetLimit()
+    {
+        limit = Integer.MAX_VALUE;
+    }
+
+    public void limit(int newLimit)
+    {
+        limit = newLimit;
+    }
+
+    public int bytesRemaining()
+    {
+        return limit;
+    }
+
+    public int getCrc()
+    {
+        return (int) crc.getValue();
+    }
+
+    public void seek(long position) throws IOException
+    {
+        source.seek(position);
+    }
+
+    public long getPosition()
+    {
+        return source.getPosition();
+    }
+
+    public long getPositionLimit()
+    {
+        return source.getPositionLimit();
+    }
+
+    public int read() throws IOException
+    {
+        int b = source.read();
+        crc.update(b);
+        limit--;
+        return b;
+    }
+
+    @Override
+    public int read(byte[] buff, int offset, int length) throws IOException
+    {
+        if (length > limit)
+            throw new IOException("Digest mismatch exception");
+
+        int copied = source.read(buff, offset, length);
+        crc.update(buff, offset, copied);
+        limit -= copied;
+        return copied;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/EncodedHintMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/EncodedHintMessage.java b/src/java/org/apache/cassandra/hints/EncodedHintMessage.java
new file mode 100644
index 0000000..2797495
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/EncodedHintMessage.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+/**
+ * A specialized version of {@link HintMessage} that takes an already encoded in a bytebuffer hint and sends it verbatim.
+ *
+ * An optimization for when dispatching a hint file of the current messaging version to a node of the same messaging version,
+ * which is the most common case. Saves on extra ByteBuffer allocations one redundant hint deserialization-serialization cycle.
+ *
+ * Never deserialized as an EncodedHintMessage - the receiving side will always deserialize the message as vanilla
+ * {@link HintMessage}.
+ */
+final class EncodedHintMessage
+{
+    private static final IVersionedSerializer<EncodedHintMessage> serializer = new Serializer();
+
+    private final UUID hostId;
+    private final ByteBuffer hint;
+    private final int version;
+
+    EncodedHintMessage(UUID hostId, ByteBuffer hint, int version)
+    {
+        this.hostId = hostId;
+        this.hint = hint;
+        this.version = version;
+    }
+
+    MessageOut<EncodedHintMessage> createMessageOut()
+    {
+        return new MessageOut<>(MessagingService.Verb.HINT, this, serializer);
+    }
+
+    private static class Serializer implements IVersionedSerializer<EncodedHintMessage>
+    {
+        public long serializedSize(EncodedHintMessage message, int version)
+        {
+            if (version != message.version)
+                throw new IllegalArgumentException("serializedSize() called with non-matching version " + version);
+
+            int size = (int) UUIDSerializer.serializer.serializedSize(message.hostId, version);
+            size += TypeSizes.sizeof(message.hint.remaining());
+            size += message.hint.remaining();
+            return size;
+        }
+
+        public void serialize(EncodedHintMessage message, DataOutputPlus out, int version) throws IOException
+        {
+            if (version != message.version)
+                throw new IllegalArgumentException("serialize() called with non-matching version " + version);
+
+            UUIDSerializer.serializer.serialize(message.hostId, out, version);
+            out.writeInt(message.hint.remaining());
+            out.write(message.hint);
+        }
+
+        public EncodedHintMessage deserialize(DataInputPlus in, int version) throws IOException
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/Hint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/Hint.java b/src/java/org/apache/cassandra/hints/Hint.java
new file mode 100644
index 0000000..d8f85c5
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/Hint.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * Encapsulates the hinted mutation, its creation time, and the gc grace seconds param for each table involved.
+ *
+ * - Why do we need to track hint creation time?
+ * - We must exclude updates for tables that have been truncated after hint's creation, otherwise the result is data corruption.
+ *
+ * - Why do we need to track gc grace seconds?
+ * - Hints can stay in storage for a while before being applied, and without recording gc grace seconds (+ creation time),
+ *   if we apply the mutation blindly, we risk resurrecting a deleted value, a tombstone for which had been already
+ *   compacted away while the hint was in storage.
+ *
+ *   We also look at the smallest current value of the gcgs param for each affected table when applying the hint, and use
+ *   creation time + min(recorded gc gs, current gcgs + current gc grace) as the overall hint expiration time.
+ *   This allows now to safely reduce gc gs on tables without worrying that an applied old hint might resurrect any data.
+ */
+public final class Hint
+{
+    public static final Serializer serializer = new Serializer();
+
+    final Mutation mutation;
+    final long creationTime;  // time of hint creation (in milliseconds)
+    final int gcgs; // the smallest gc gs of all involved tables
+
+    private Hint(Mutation mutation, long creationTime, int gcgs)
+    {
+        this.mutation = mutation;
+        this.creationTime = creationTime;
+        this.gcgs = gcgs;
+    }
+
+    /**
+     * @param mutation the hinted mutation
+     * @param creationTime time of this hint's creation (in milliseconds since epoch)
+     */
+    public static Hint create(Mutation mutation, long creationTime)
+    {
+        return new Hint(mutation, creationTime, mutation.smallestGCGS());
+    }
+
+    /**
+     * @param mutation the hinted mutation
+     * @param creationTime time of this hint's creation (in milliseconds since epoch)
+     * @param gcgs the smallest gcgs of all tables involved at the time of hint creation (in seconds)
+     */
+    public static Hint create(Mutation mutation, long creationTime, int gcgs)
+    {
+        return new Hint(mutation, creationTime, gcgs);
+    }
+
+    /**
+     * Applies the contained mutation unless it's expired, filtering out any updates for truncated tables
+     */
+    void apply()
+    {
+        if (!isLive())
+            return;
+
+        // filter out partition update for table that have been truncated since hint's creation
+        Mutation filtered = mutation;
+        for (UUID id : mutation.getColumnFamilyIds())
+            if (creationTime <= SystemKeyspace.getTruncatedAt(id))
+                filtered = filtered.without(id);
+
+        if (!filtered.isEmpty())
+            filtered.apply();
+    }
+
+    /**
+     * @return calculates whether or not it is safe to apply the hint without risking to resurrect any deleted data
+     */
+    boolean isLive()
+    {
+        int smallestGCGS = Math.min(gcgs, mutation.smallestGCGS());
+        long expirationTime = creationTime + TimeUnit.SECONDS.toMillis(smallestGCGS);
+        return expirationTime > System.currentTimeMillis();
+    }
+
+    static final class Serializer implements IVersionedSerializer<Hint>
+    {
+        public long serializedSize(Hint hint, int version)
+        {
+            long size = TypeSizes.sizeof(hint.creationTime);
+            size += TypeSizes.sizeof(hint.gcgs);
+            size += Mutation.serializer.serializedSize(hint.mutation, version);
+            return size;
+        }
+
+        public void serialize(Hint hint, DataOutputPlus out, int version) throws IOException
+        {
+            out.writeLong(hint.creationTime);
+            out.writeInt(hint.gcgs);
+            Mutation.serializer.serialize(hint.mutation, out, version);
+        }
+
+        public Hint deserialize(DataInputPlus in, int version) throws IOException
+        {
+            long creationTime = in.readLong();
+            int gcgs = in.readInt();
+            return new Hint(Mutation.serializer.deserialize(in, version), creationTime, gcgs);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintMessage.java b/src/java/org/apache/cassandra/hints/HintMessage.java
new file mode 100644
index 0000000..89baa89
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintMessage.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.hints;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.UUID;
+
+import javax.annotation.Nullable;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.UnknownColumnFamilyException;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.BytesReadTracker;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+/**
+ * The message we use to dispatch and forward hints.
+ *
+ * Encodes the host id the hint is meant for and the hint itself.
+ * We use the host id to determine whether we should store or apply the hint:
+ * 1. If host id equals to the receiving node host id, then we apply the hint
+ * 2. If host id is different from the receiving node's host id, then we store the hint
+ *
+ * Scenario (1) means that we are dealing with regular hint dispatch.
+ * Scenario (2) means that we got a hint from a node that's going through decommissioning and is streaming its hints
+ * elsewhere first.
+ */
+public final class HintMessage
+{
+    public static final IVersionedSerializer<HintMessage> serializer = new Serializer();
+
+    final UUID hostId;
+
+    @Nullable // can be null if we fail do decode the hint because of an unknown table id in it
+    final Hint hint;
+
+    @Nullable // will usually be null, unless a hint deserialization fails due to an unknown table id
+    final UUID unknownTableID;
+
+    HintMessage(UUID hostId, Hint hint)
+    {
+        this.hostId = hostId;
+        this.hint = hint;
+        this.unknownTableID = null;
+    }
+
+    HintMessage(UUID hostId, UUID unknownTableID)
+    {
+        this.hostId = hostId;
+        this.hint = null;
+        this.unknownTableID = unknownTableID;
+    }
+
+    public MessageOut<HintMessage> createMessageOut()
+    {
+        return new MessageOut<>(MessagingService.Verb.HINT, this, serializer);
+    }
+
+    public static class Serializer implements IVersionedSerializer<HintMessage>
+    {
+        public long serializedSize(HintMessage message, int version)
+        {
+            int size = (int) UUIDSerializer.serializer.serializedSize(message.hostId, version);
+
+            int hintSize = (int) Hint.serializer.serializedSize(message.hint, version);
+            size += TypeSizes.sizeof(hintSize);
+            size += hintSize;
+
+            return size;
+        }
+
+        public void serialize(HintMessage message, DataOutputPlus out, int version) throws IOException
+        {
+            Objects.requireNonNull(message.hint); // we should never *send* a HintMessage with null hint
+
+            UUIDSerializer.serializer.serialize(message.hostId, out, version);
+
+            /*
+             * We are serializing the hint size so that the receiver of the message could gracefully handle
+             * deserialize failure when a table had been dropped, by simply skipping the unread bytes.
+             */
+            out.writeInt((int) Hint.serializer.serializedSize(message.hint, version));
+
+            Hint.serializer.serialize(message.hint, out, version);
+        }
+
+        /*
+         * It's not an exceptional scenario to have a hints file streamed that have partition updates for tables
+         * that don't exist anymore. We want to handle that case gracefully instead of dropping the connection for every
+         * one of them.
+         */
+        public HintMessage deserialize(DataInputPlus in, int version) throws IOException
+        {
+            UUID hostId = UUIDSerializer.serializer.deserialize(in, version);
+
+            int hintSize = in.readInt();
+            BytesReadTracker countingIn = new BytesReadTracker(in);
+            try
+            {
+                return new HintMessage(hostId, Hint.serializer.deserialize(countingIn, version));
+            }
+            catch (UnknownColumnFamilyException e)
+            {
+                in.skipBytes(hintSize - (int) countingIn.getBytesRead());
+                return new HintMessage(hostId, e.cfId);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/96d41f0e/src/java/org/apache/cassandra/hints/HintResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintResponse.java b/src/java/org/apache/cassandra/hints/HintResponse.java
new file mode 100644
index 0000000..8aa888f
--- /dev/null
+++ b/src/java/org/apache/cassandra/hints/HintResponse.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.hints;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+
+/**
+ * An empty successful response to a HintMessage.
+ */
+public final class HintResponse
+{
+    public static final IVersionedSerializer<HintResponse> serializer = new Serializer();
+
+    static final HintResponse instance = new HintResponse();
+    static final MessageOut<HintResponse> message =
+        new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, instance, serializer);
+
+    private HintResponse()
+    {
+    }
+
+    private static final class Serializer implements IVersionedSerializer<HintResponse>
+    {
+        public long serializedSize(HintResponse response, int version)
+        {
+            return 0;
+        }
+
+        public void serialize(HintResponse response, DataOutputPlus out, int version)
+        {
+        }
+
+        public HintResponse deserialize(DataInputPlus in, int version)
+        {
+            return instance;
+        }
+    }
+}


Mime
View raw message