cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1222467 - in /cassandra/branches/cassandra-1.0: ./ conf/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/
Date Thu, 22 Dec 2011 21:34:25 GMT
Author: jbellis
Date: Thu Dec 22 21:34:24 2011
New Revision: 1222467

URL: http://svn.apache.org/viewvc?rev=1222467&view=rev
Log:
attempt hint delivery every ten minutes; hint handoff throttle delay default changed to 1ms,
from 50
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-3554

Modified:
    cassandra/branches/cassandra-1.0/CHANGES.txt
    cassandra/branches/cassandra-1.0/conf/cassandra.yaml
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageServiceMBean.java

Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1222467&r1=1222466&r2=1222467&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Thu Dec 22 21:34:24 2011
@@ -1,4 +1,7 @@
 1.0.7
+ * attempt hint delivery every ten minutes, or when failure detector
+   notifies us that a node is back up, whichever comes first.  hint
+   handoff throttle delay default changed to 1ms, from 50 (CASSANDRA-3554)
  * add nodetool setstreamthroughput (CASSANDRA-3571)
  * fix assertion when dropping a columnfamily with no sstables (CASSANDRA-3614)
  * more efficient allocation of small bloom filters (CASSANDRA-3618)

Modified: cassandra/branches/cassandra-1.0/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/conf/cassandra.yaml?rev=1222467&r1=1222466&r2=1222467&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/conf/cassandra.yaml (original)
+++ cassandra/branches/cassandra-1.0/conf/cassandra.yaml Thu Dec 22 21:34:24 2011
@@ -26,8 +26,8 @@ hinted_handoff_enabled: true
 # this defines the maximum amount of time a dead host will have hints
 # generated.  After it has been dead this long, hints will be dropped.
 max_hint_window_in_ms: 3600000 # one hour
-# Sleep this long after delivering each row or row fragment
-hinted_handoff_throttle_delay_in_ms: 50
+# Sleep this long after delivering each hint
+hinted_handoff_throttle_delay_in_ms: 1
 
 # authentication backend, implementing IAuthenticator; used to identify users
 authenticator: org.apache.cassandra.auth.AllowAllAuthenticator

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1222467&r1=1222466&r2=1222467&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Thu Dec 22 21:34:24 2011
@@ -1313,7 +1313,6 @@ public class ColumnFamilyStore implement
       * @return true if we found all keys we were looking for, otherwise false
      */
     public List<Row> getRangeSlice(ByteBuffer superColumn, final AbstractBounds range,
int maxResults, IFilter columnFilter)
-    throws ExecutionException, InterruptedException
     {
         assert range instanceof Bounds
                || (!((Range)range).isWrapAround() || range.right.equals(StorageService.getPartitioner().getMinimumToken()))

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1222467&r1=1222466&r2=1222467&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/HintedHandOffManager.java
(original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/HintedHandOffManager.java
Thu Dec 22 21:34:24 2011
@@ -20,23 +20,26 @@ package org.apache.cassandra.db;
 
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.collect.ImmutableSortedSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.filter.IFilter;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.dht.IPartitioner;
@@ -61,7 +64,7 @@ import org.cliffc.high_scale_lib.NonBloc
  * (We have to use String keys for compatibility with OPP.)
  * SuperColumns in these rows are the mutations to replay, with uuid names:
  *
- *  <dest ip>: {              // key
+ *  <dest token>: {           // key
  *    <uuid>: {               // supercolumn
  *      mutation: <mutation>  // subcolumn
  *      version: <mutation serialization version>
@@ -96,7 +99,7 @@ public class HintedHandOffManager implem
 
     private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HintedHandoff",
Thread.MIN_PRIORITY);
 
-    public HintedHandOffManager()
+    public void start()
     {
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
@@ -107,25 +110,23 @@ public class HintedHandOffManager implem
         {
             throw new RuntimeException(e);
         }
-    }
-    public void registerMBean()
-    {
         logger_.debug("Created HHOM instance, registered MBean.");
+
+        Runnable runnable = new Runnable()
+        {
+            public void run()
+            {
+                scheduleAllDeliveries();
+            }
+        };
+        StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
     }
 
-    private static boolean sendMutation(InetAddress endpoint, RowMutation mutation) throws
IOException
+    private static void sendMutation(InetAddress endpoint, RowMutation mutation) throws TimeoutException
     {
         IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
         MessagingService.instance().sendRR(mutation, endpoint, responseHandler);
-
-        try
-        {
-            responseHandler.get();
-        }
-        catch (TimeoutException e)
-        {
-            return false;
-        }
+        responseHandler.get();
 
         try
         {
@@ -135,8 +136,6 @@ public class HintedHandOffManager implem
         {
             throw new AssertionError(e);
         }
-
-        return true;
     }
 
     private static void deleteHint(ByteBuffer tokenBytes, ByteBuffer hintId, long timestamp)
throws IOException
@@ -226,7 +225,7 @@ public class HintedHandOffManager implem
         logger_.debug("schema for {} matches local schema", endpoint);
         return waited;
     }
-            
+
     private void deliverHintsToEndpoint(InetAddress endpoint) throws IOException, DigestMismatchException,
InvalidRequestException, TimeoutException, InterruptedException
     {
         ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
@@ -275,12 +274,12 @@ public class HintedHandOffManager implem
         while (true)
         {
             QueryFilter filter = QueryFilter.getSliceFilter(epkey, new QueryPath(HINTS_CF),
startColumn, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, PAGE_SIZE);
-            ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter),
Integer.MAX_VALUE);
-            if (pagingFinished(hintColumnFamily, startColumn))
+            ColumnFamily hintsPage = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter),
Integer.MAX_VALUE);
+            if (pagingFinished(hintsPage, startColumn))
                 break;
 
             page:
-            for (IColumn hint : hintColumnFamily.getSortedColumns())
+            for (IColumn hint : hintsPage.getSortedColumns())
             {
                 startColumn = hint.name();
                 for (IColumn subColumn : hint.getSubColumns())
@@ -305,14 +304,15 @@ public class HintedHandOffManager implem
                 DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(mutationColumn.value()));
                 RowMutation rm = RowMutation.serializer().deserialize(in, ByteBufferUtil.toInt(versionColumn.value()));
 
-                if (sendMutation(endpoint, rm))
+                try
                 {
+                    sendMutation(endpoint, rm);
                     deleteHint(tokenBytes, hint.name(), hint.maxTimestamp());
                     rowsReplayed++;
                 }
-                else
+                catch (TimeoutException e)
                 {
-                    logger_.info("Could not complete hinted handoff to " + endpoint);
+                    logger_.info(String.format("Timed out replaying hints to %s; aborting
further deliveries", endpoint));
                     break delivery;
                 }
             }
@@ -335,12 +335,37 @@ public class HintedHandOffManager implem
                                    rowsReplayed, endpoint));
     }
 
+    /**
+     * Attempt delivery to any node for which we have hints.  Necessary since we can generate
hints even for
+     * nodes which are never officially down/failed.
+     */
+    private void scheduleAllDeliveries()
+    {
+        if (logger_.isDebugEnabled())
+          logger_.debug("Started scheduleAllDeliveries");
+
+        ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
+        IPartitioner p = StorageService.getPartitioner();
+        Range range = new Range(p.getMinimumToken(), p.getMinimumToken(), p);
+        IFilter filter = new NamesQueryFilter(ImmutableSortedSet.<ByteBuffer>of());
+        List<Row> rows = hintStore.getRangeSlice(null, range, Integer.MAX_VALUE, filter);
+        for (Row row : rows)
+        {
+            Token<?> token = StorageService.getPartitioner().getTokenFactory().fromByteArray(row.key.key);
+            InetAddress target = StorageService.instance.getTokenMetadata().getEndpoint(token);
+            scheduleHintDelivery(target);
+        }
+
+        if (logger_.isDebugEnabled())
+          logger_.debug("Finished scheduleAllDeliveries");
+    }
+
     /*
      * This method is used to deliver hints to a particular endpoint.
      * When we learn that some endpoint is back up we deliver the data
      * to him via an event driven mechanism.
     */
-    public void deliverHints(final InetAddress to)
+    public void scheduleHintDelivery(final InetAddress to)
     {
         logger_.debug("deliverHints to {}", to);
         if (!queuedDeliveries.add(to))
@@ -356,9 +381,9 @@ public class HintedHandOffManager implem
     	executor_.execute(r);
     }
 
-    public void deliverHints(String to) throws UnknownHostException
+    public void scheduleHintDelivery(String to) throws UnknownHostException
     {
-        deliverHints(InetAddress.getByName(to));
+        scheduleHintDelivery(InetAddress.getByName(to));
     }
 
     public List<String> listEndpointsPendingHints()

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java?rev=1222467&r1=1222466&r2=1222467&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
(original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/HintedHandOffManagerMBean.java
Thu Dec 22 21:34:24 2011
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.db;
 
+import java.net.UnknownHostException;
 import java.util.List;
 import java.util.Map;
 
@@ -25,9 +26,9 @@ public interface HintedHandOffManagerMBe
 {
     /**
      * Nuke all hints from this node to `ep`.
-     * @param epaddr String rep. of endpoint address to delete hints for, either ip address
("127.0.0.1") or hostname
+     * @param host String rep. of endpoint address to delete hints for, either ip address
("127.0.0.1") or hostname
      */
-    public void deleteHintsForEndpoint(final String epaddr);
+    public void deleteHintsForEndpoint(final String host);
 
     /**
      * List all the endpoints that this node has hints for.
@@ -42,5 +43,8 @@ public interface HintedHandOffManagerMBe
      * @return map of endpoint -> hint count
      */
     public Map<String, Integer> countPendingHints();
+
+    /** force hint delivery to an endpoint **/
+    public void scheduleHintDelivery(String host) throws UnknownHostException;
 }
 

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java?rev=1222467&r1=1222466&r2=1222467&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/db/RowMutation.java Thu
Dec 22 21:34:24 2011
@@ -112,7 +112,7 @@ public class RowMutation implements IMut
      * The format is the following:
      *
      * HintsColumnFamily: {        // cf
-     *   <dest ip>: {              // key
+     *   <dest token>: {           // key
      *     <uuid>: {               // super-column
      *       table: <table>        // columns
      *       key: <key>

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1222467&r1=1222466&r2=1222467&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageProxy.java
Thu Dec 22 21:34:24 2011
@@ -804,21 +804,10 @@ public class StorageProxy implements Sto
                     if (logger.isDebugEnabled())
                         logger.debug("local range slice");
                     ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
-                    try
-                    {
-                        rows.addAll(cfs.getRangeSlice(command.super_column,
-                                                    range,
-                                                    command.max_keys,
-                                                    QueryFilter.getFilter(command.predicate,
cfs.getComparator())));
-                    }
-                    catch (ExecutionException e)
-                    {
-                        throw new RuntimeException(e.getCause());
-                    }
-                    catch (InterruptedException e)
-                    {
-                        throw new AssertionError(e);
-                    }
+                    rows.addAll(cfs.getRangeSlice(command.super_column,
+                                                range,
+                                                command.max_keys,
+                                                QueryFilter.getFilter(command.predicate,
cfs.getComparator())));
                 }
                 else
                 {

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java?rev=1222467&r1=1222466&r2=1222467&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageService.java
Thu Dec 22 21:34:24 2011
@@ -510,7 +510,7 @@ public class StorageService implements I
         MigrationManager.passiveAnnounce(Schema.instance.getVersion());
         Gossiper.instance.addLocalApplicationState(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());
 
-        HintedHandOffManager.instance.registerMBean();
+        HintedHandOffManager.instance.start();
 
         if (DatabaseDescriptor.isAutoBootstrap()
                 && DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress())
@@ -1465,7 +1465,7 @@ public class StorageService implements I
     public void onAlive(InetAddress endpoint, EndpointState state)
     {
         if (!isClientMode && getTokenMetadata().isMember(endpoint))
-            deliverHints(endpoint);
+            HintedHandOffManager.instance.scheduleHintDelivery(endpoint);
     }
 
     public void onRemove(InetAddress endpoint)
@@ -1516,18 +1516,9 @@ public class StorageService implements I
         return map;
     }
 
-    /**
-     * Deliver hints to the specified node when it has crashed
-     * and come back up/ marked as alive after a network partition
-    */
-    public final void deliverHints(InetAddress endpoint)
-    {
-        HintedHandOffManager.instance.deliverHints(endpoint);
-    }
-
     public final void deliverHints(String host) throws UnknownHostException
     {
-        HintedHandOffManager.instance.deliverHints(host);
+        HintedHandOffManager.instance.scheduleHintDelivery(host);
     }
 
     public Token getLocalToken()

Modified: cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=1222467&r1=1222466&r2=1222467&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageServiceMBean.java
(original)
+++ cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/service/StorageServiceMBean.java
Thu Dec 22 21:34:24 2011
@@ -300,9 +300,6 @@ public interface StorageServiceMBean
      */
     public void truncate(String keyspace, String columnFamily) throws UnavailableException,
TimeoutException, IOException;
 
-    /** force hint delivery to an endpoint **/
-    public void deliverHints(String host) throws UnknownHostException;
-
     /** save row and key caches */
     public void saveCaches() throws ExecutionException, InterruptedException;
 



Mime
View raw message