cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [1/2] git commit: Improve asynchronous hint delivery
Date Wed, 20 Mar 2013 17:09:44 GMT
Updated Branches:
  refs/heads/trunk 08721da28 -> 076a0e4e7


Improve asynchronous hint delivery

patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-5179


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d224c2be
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d224c2be
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d224c2be

Branch: refs/heads/trunk
Commit: d224c2beb6cee67a4ab390026697a2fd7e8c7fd5
Parents: 87d9362
Author: Aleksey Yeschenko <aleksey@apache.org>
Authored: Wed Mar 20 20:02:21 2013 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Wed Mar 20 20:02:21 2013 +0300

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../apache/cassandra/db/HintedHandOffManager.java  |  178 ++++++++-------
 2 files changed, 98 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d224c2be/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 174cca4..3eb85d6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -2,6 +2,7 @@
  * Fix mixing prepared statements between keyspaces (CASSANDRA-5352)
  * Fix consistency level during bootstrap - strike 3 (CASSANDRA-5354)
  * Fix transposed arguments in AlreadyExistsException (CASSANDRA-5362)
+ * Improve asynchronous hint delivery (CASSANDRA-5179)
 
 
 1.2.3

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d224c2be/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 58e6ef0..53411f5 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -31,6 +31,7 @@ import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,13 +48,13 @@ import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.metrics.HintedHandoffMetrics;
-import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.*;
@@ -61,7 +62,6 @@ import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.utils.WrappedRunnable;
 import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
 /**
@@ -107,7 +107,10 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                                                                                  Integer.MAX_VALUE,
                                                                                  TimeUnit.SECONDS,
                                                                                  new LinkedBlockingQueue<Runnable>(),
-                                                                                 new NamedThreadFactory("HintedHandoff",
Thread.MIN_PRIORITY), "internal");
+                                                                                 new NamedThreadFactory("HintedHandoff",
Thread.MIN_PRIORITY),
+                                                                                 "internal");
+
+    private final ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.HINTS_CF);
 
     public void start()
     {
@@ -133,7 +136,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
     }
 
-    private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer columnName, long timestamp)
throws IOException
+    private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer columnName, long timestamp)
     {
         RowMutation rm = new RowMutation(Table.SYSTEM_KS, tokenBytes);
         rm.delete(new QueryPath(SystemTable.HINTS_CF, null, columnName), timestamp);
@@ -149,8 +152,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         }
         catch (UnknownHostException e)
         {
-            logger.warn("Unable to find "+ipOrHostname+", not a hostname or ipaddr of a node?:");
-            e.printStackTrace();
+            logger.warn("Unable to find {}, not a hostname or ipaddr of a node", ipOrHostname);
             throw new RuntimeException(e);
         }
     }
@@ -171,13 +173,13 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
             {
                 try
                 {
-                    logger.info("Deleting any stored hints for " + endpoint);
+                    logger.info("Deleting any stored hints for {}", endpoint);
                     rm.apply();
                     compact();
                 }
                 catch (Exception e)
                 {
-                    logger.warn("Could not delete hints for " + endpoint + ": " + e);
+                    logger.warn("Could not delete hints for {}: {}", endpoint, e);
                 }
             }
         };
@@ -187,7 +189,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
     @VisibleForTesting
     protected Future<?> compact() throws ExecutionException, InterruptedException
     {
-        final ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.HINTS_CF);
         hintStore.forceBlockingFlush();
         ArrayList<Descriptor> descriptors = new ArrayList<Descriptor>();
         for (SSTable sstable : hintStore.getSSTables())
@@ -245,21 +246,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         return waited;
     }
 
-    private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException,
InvalidRequestException, InterruptedException
-    {
-        try
-        {
-            deliverHintsToEndpointInternal(endpoint);
-        }
-        finally
-        {
-            queuedDeliveries.remove(endpoint);
-        }
-    }
-
-    private void deliverHintsToEndpointInternal(InetAddress endpoint) throws IOException,
DigestMismatchException, InvalidRequestException, InterruptedException
+    private void deliverHintsToEndpoint(InetAddress endpoint)
     {
-        ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.HINTS_CF);
         if (hintStore.isEmpty())
             return; // nothing to do, don't confuse users by logging a no-op handoff
 
@@ -286,12 +274,18 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
             return;
         }
 
-        // 1. Get the key of the endpoint we need to handoff
-        // 2. For each column, deserialize the mutation and send it to the endpoint
-        // 3. Delete the subcolumn if the write was successful
-        // 4. Force a flush
-        // 5. Do major compaction to clean up all deletes etc.
+        doDeliverHintsToEndpoint(endpoint);
+    }
 
+    /*
+     * 1. Get the key of the endpoint we need to handoff
+     * 2. For each column, deserialize the mutation and send it to the endpoint
+     * 3. Delete the subcolumn if the write was successful
+     * 4. Force a flush
+     * 5. Do major compaction to clean up all deletes etc.
+     */
+    private void doDeliverHintsToEndpoint(InetAddress endpoint)
+    {
         // find the hints for the node using its token.
         UUID hostId = Gossiper.instance.getHostId(endpoint);
         logger.info("Started hinted handoff for host: {} with IP: {}", hostId, endpoint);
@@ -301,50 +295,47 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         final AtomicInteger rowsReplayed = new AtomicInteger(0);
         ByteBuffer startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
-        int pageSize = PAGE_SIZE;
-        // read less columns (mutations) per page if they are very large
-        if (hintStore.getMeanColumns() > 0)
-        {
-            int averageColumnSize = (int) (hintStore.getMeanRowSize() / hintStore.getMeanColumns());
-            pageSize = Math.min(PAGE_SIZE, DatabaseDescriptor.getInMemoryCompactionLimit()
/ averageColumnSize);
-            pageSize = Math.max(2, pageSize); // page size of 1 does not allow actual paging
b/c of >= behavior on startColumn
-            logger.debug("average hinted-row column size is {}; using pageSize of {}", averageColumnSize,
pageSize);
-        }
+        int pageSize = calculatePageSize();
+        logger.debug("Using pageSize of {}", pageSize);
 
         // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0
in cassandra.yaml).
         int throttleInKB = DatabaseDescriptor.getHintedHandoffThrottleInKB();
         RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE
: throttleInKB * 1024);
 
+        delivery:
         while (true)
         {
-            // check if hints delivery has been paused during the process
-            if (hintedHandOffPaused)
-            {
-                logger.debug("Hints delivery process is paused, aborting");
-                break;
-            }
+            QueryFilter filter = QueryFilter.getSliceFilter(epkey,
+                                                            new QueryPath(SystemTable.HINTS_CF),
+                                                            startColumn,
+                                                            ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                            false,
+                                                            pageSize);
+
+            ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter),
+                                                                     (int) (System.currentTimeMillis()
/ 1000));
 
-            QueryFilter filter = QueryFilter.getSliceFilter(epkey, new QueryPath(SystemTable.HINTS_CF),
startColumn, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, pageSize);
-            ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter),
(int)(System.currentTimeMillis() / 1000));
             if (pagingFinished(hintsPage, startColumn))
-            {
-                if (ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(startColumn))
-                {
-                    // we've started from the beginning and could not find anything (only
maybe tombstones)
-                    break;
-                }
-                else
-                {
-                    // restart query from the first column until we read an empty row;
-                    // that will tell us everything was delivered successfully with no timeouts
-                    startColumn = ByteBufferUtil.EMPTY_BYTE_BUFFER;
-                    continue;
-                }
+                break;
 
+            // check if node is still alive and we should continue delivery process
+            if (!FailureDetector.instance.isAlive(endpoint))
+            {
+                logger.info("Endpoint {} died during hint delivery; aborting ({} delivered)",
endpoint, rowsReplayed);
+                return;
             }
 
+            List<WriteResponseHandler> responseHandlers = Lists.newArrayList();
+
             for (final IColumn hint : hintsPage.getSortedColumns())
             {
+                // check if hints delivery has been paused during the process
+                if (hintedHandOffPaused)
+                {
+                    logger.debug("Hints delivery process is paused, aborting");
+                    break delivery;
+                }
+
                 // Skip tombstones:
                 // if we iterate quickly enough, it's possible that we could request a new
page in the same millisecond
                 // in which the local deletion timestamp was generated on the last column
in the old page, in which
@@ -353,11 +344,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                 if (!hint.isLive())
                     continue;
 
-                if (hintedHandOffPaused)
-                {
-                    logger.debug("Hints delivery process is paused, aborting");
-                    break;
-                }
                 startColumn = hint.name();
 
                 ByteBuffer[] components = comparator.split(hint.name());
@@ -374,29 +360,42 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                     deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
                     continue;
                 }
+                catch (IOException e)
+                {
+                    throw new AssertionError(e);
+                }
 
                 MessageOut<RowMutation> message = rm.createMessage();
                 rateLimiter.acquire(message.serializedSize(MessagingService.current_version));
-                WrappedRunnable callback = new WrappedRunnable()
+                Runnable callback = new Runnable()
                 {
-                    public void runMayThrow() throws IOException
+                    public void run()
                     {
                         rowsReplayed.incrementAndGet();
                         deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
                     }
                 };
-                IAsyncCallback responseHandler = new WriteResponseHandler(endpoint, WriteType.UNLOGGED_BATCH,
callback);
+                WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint,
WriteType.UNLOGGED_BATCH, callback);
                 MessagingService.instance().sendRR(message, endpoint, responseHandler);
+                responseHandlers.add(responseHandler);
             }
 
-            // check if node is still alive and we should continue delivery process
-            if (!FailureDetector.instance.isAlive(endpoint))
+            for (WriteResponseHandler handler : responseHandlers)
             {
-                logger.debug("Endpoint {} died during hint delivery, aborting", endpoint);
-                return;
+                try
+                {
+                    handler.get();
+                }
+                catch (WriteTimeoutException e)
+                {
+                    logger.info("Timed out replaying hints to {}; aborting ({} delivered)",
endpoint, rowsReplayed);
+                    return;
+                }
             }
         }
 
+        logger.info("Finished hinted handoff of {} rows to endpoint {}", rowsReplayed, endpoint);
+
         try
         {
             compact().get();
@@ -405,11 +404,20 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         {
             throw new RuntimeException(e);
         }
+    }
 
-        logger.info(String.format("Finished hinted handoff of %s rows to endpoint %s", rowsReplayed,
endpoint));
-        if (hintedHandOffPaused)
+    private int calculatePageSize()
+    {
+        // read less columns (mutations) per page if they are very large
+        if (hintStore.getMeanColumns() > 0)
         {
-            logger.info("Hints delivery process is paused, not delivering further hints");
+            int averageColumnSize = (int) (hintStore.getMeanRowSize() / hintStore.getMeanColumns());
+            // page size of 1 does not allow actual paging b/c of >= behavior on startColumn
+            return Math.max(2, Math.min(PAGE_SIZE, DatabaseDescriptor.getInMemoryCompactionLimit()
/ averageColumnSize));
+        }
+        else
+        {
+            return PAGE_SIZE;
         }
     }
 
@@ -422,7 +430,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         if (logger.isDebugEnabled())
           logger.debug("Started scheduleAllDeliveries");
 
-        ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.HINTS_CF);
         IPartitioner p = StorageService.getPartitioner();
         RowPosition minPos = p.getMinimumToken().minKeyBound();
         Range<RowPosition> range = new Range<RowPosition>(minPos, minPos, p);
@@ -449,17 +456,25 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
     public void scheduleHintDelivery(final InetAddress to)
     {
         // We should not deliver hints to the same host in 2 different threads
-        if (queuedDeliveries.contains(to) || !queuedDeliveries.add(to))
+        if (!queuedDeliveries.add(to))
             return;
+
         logger.debug("Scheduling delivery of Hints to {}", to);
-        Runnable r = new WrappedRunnable()
+
+        executor.execute(new Runnable()
         {
-            public void runMayThrow() throws Exception
+            public void run()
             {
-                deliverHintsToEndpoint(to);
+                try
+                {
+                    deliverHintsToEndpoint(to);
+                }
+                finally
+                {
+                    queuedDeliveries.remove(to);
+                }
             }
-        };
-        executor.execute(r);
+        });
     }
 
     public void scheduleHintDelivery(String to) throws UnknownHostException
@@ -467,7 +482,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         scheduleHintDelivery(InetAddress.getByName(to));
     }
 
-    public void pauseHintsDelivery(boolean b) {
+    public void pauseHintsDelivery(boolean b)
+    {
         hintedHandOffPaused = b;
     }
 


Mime
View raw message