cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r951246 - in /cassandra/trunk: ./ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/service/
Date Fri, 04 Jun 2010 02:49:34 GMT
Author: jbellis
Date: Fri Jun  4 02:49:33 2010
New Revision: 951246

URL: http://svn.apache.org/viewvc?rev=951246&view=rev
Log:
merge from 0.6

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java 
 (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props
changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
  (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun  4 02:49:33 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-951242
+/cassandra/branches/cassandra-0.6:922689-951245
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5:888872-915439

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun  4 02:49:33 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-951242
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-951245
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-903502

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun  4 02:49:33 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-951242
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-951245
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-903502

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun  4 02:49:33 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-951242
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-951245
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-903502

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun  4 02:49:33 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-951242
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-951245
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-903502

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun  4 02:49:33 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-951242
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-951245
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-903502

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=951246&r1=951245&r2=951246&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Fri Jun  4
02:49:33 2010
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.db;
 
+import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.Arrays;
 import java.util.concurrent.TimeoutException;
@@ -83,27 +84,11 @@ public class HintedHandOffManager
     public static final HintedHandOffManager instance = new HintedHandOffManager();
 
     private static final Logger logger_ = LoggerFactory.getLogger(HintedHandOffManager.class);
-    final static long INTERVAL_IN_MS = 3600 * 1000; // check for ability to deliver hints
this often
     public static final String HINTS_CF = "HintsColumnFamily";
     private static final int PAGE_SIZE = 10000;
 
     private final ExecutorService executor_ = new JMXEnabledThreadPoolExecutor("HINTED-HANDOFF-POOL");
 
-    protected HintedHandOffManager()
-    {
-        new Thread(new WrappedRunnable()
-        {
-            public void runMayThrow() throws Exception
-            {
-                while (true)
-                {
-                    Thread.sleep(INTERVAL_IN_MS);
-                    deliverAllHints();
-                }
-            }
-        }, "Hint delivery").start();
-    }
-
     private static boolean sendMessage(InetAddress endpoint, String tableName, byte[] key)
throws IOException
     {
         if (!Gossiper.instance.isKnownEndpoint(endpoint))
@@ -154,68 +139,6 @@ public class HintedHandOffManager
         rm.apply();
     }
 
-    /** hintStore must be the hints columnfamily from the system table */
-    private static void deliverAllHints() throws DigestMismatchException, IOException, InvalidRequestException,
TimeoutException
-    {
-        if (logger_.isDebugEnabled())
-          logger_.debug("Started deliverAllHints");
-
-        // 1. Scan through all the keys that we need to handoff
-        // 2. For each key read the list of recipients and send
-        // 3. Delete that recipient from the key if write was successful
-        // 4. If all writes were success for a given key we can even delete the key .
-        // 5. Now force a flush
-        // 6. Do major compaction to clean up all deletes etc.
-        // 7. I guess we are done
-        ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
-        for (String tableName : DatabaseDescriptor.getTables())
-        {
-            DecoratedKey tableNameKey = StorageService.getPartitioner().decorateKey(tableName.getBytes(UTF8));
-            byte[] startColumn = ArrayUtils.EMPTY_BYTE_ARRAY;
-            while (true)
-            {
-                QueryFilter filter = QueryFilter.getSliceFilter(tableNameKey, new QueryPath(HINTS_CF),
startColumn, ArrayUtils.EMPTY_BYTE_ARRAY, null, false, PAGE_SIZE);
-                ColumnFamily hintColumnFamily = ColumnFamilyStore.removeDeleted(hintStore.getColumnFamily(filter),
Integer.MAX_VALUE);
-                if (pagingFinished(hintColumnFamily, startColumn))
-                    break;
-                Collection<IColumn> keys = hintColumnFamily.getSortedColumns();
-
-                for (IColumn keyColumn : keys)
-                {
-                    Collection<IColumn> endpoints = keyColumn.getSubColumns();
-                    byte[] keyBytes = keyColumn.name();
-                    int deleted = 0;
-                    for (IColumn endpoint : endpoints)
-                    {
-                        if (sendMessage(InetAddress.getByAddress(endpoint.name()), tableName,
keyBytes))
-                        {
-                            deleteEndpoint(endpoint.name(), tableName, keyColumn.name(),
System.currentTimeMillis());
-                            deleted++;
-                        }
-                    }
-                    if (deleted == endpoints.size())
-                    {
-                        deleteHintKey(tableName, keyColumn.name());
-                    }
-
-                    startColumn = keyColumn.name();
-                }
-            }
-        }
-        hintStore.forceFlush();
-        try
-        {
-            CompactionManager.instance.submitMajor(hintStore).get();
-        }
-        catch (Exception e)
-        {
-            throw new RuntimeException(e);
-        }
-
-        if (logger_.isDebugEnabled())
-          logger_.debug("Finished deliverAllHints");
-    }
-
     private static boolean pagingFinished(ColumnFamily hintColumnFamily, byte[] startColumn)
     {
         // done if no hints found or the start column (same as last column processed in previous
iteration) is the only one
@@ -232,6 +155,8 @@ public class HintedHandOffManager
         // 1. Scan through all the keys that we need to handoff
         // 2. For each key read the list of recipients if the endpoint matches send
         // 3. Delete that recipient from the key if write was successful
+        // 4. Now force a flush
+        // 5. Do major compaction to clean up all deletes etc.
         ColumnFamilyStore hintStore = Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HINTS_CF);
         for (String tableName : DatabaseDescriptor.getTables())
         {
@@ -265,6 +190,15 @@ public class HintedHandOffManager
                 }
             }
         }
+        hintStore.forceFlush();
+        try
+        {
+            CompactionManager.instance.submitMajor(hintStore, 0, Integer.MAX_VALUE).get();
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
 
         if (logger_.isDebugEnabled())
           logger_.debug("Finished hinted handoff for endpoint " + endpoint);
@@ -317,4 +251,9 @@ public class HintedHandOffManager
         };
     	executor_.submit(r);
     }
+
+    public void deliverHints(String to) throws UnknownHostException
+    {
+        deliverHints(InetAddress.getByName(to));
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=951246&r1=951245&r2=951246&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Fri Jun  4 02:49:33
2010
@@ -22,6 +22,7 @@ import java.io.IOError;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Constructor;
+import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -956,6 +957,11 @@ public class StorageService implements I
         HintedHandOffManager.instance.deliverHints(endpoint);
     }
 
+    public final void deliverHints(String host) throws UnknownHostException
+    {
+        HintedHandOffManager.instance.deliverHints(host);
+    }
+
     public Token getLocalToken()
     {
         return storageMetadata_.getToken();

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java?rev=951246&r1=951245&r2=951246&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageServiceMBean.java Fri Jun
 4 02:49:33 2010
@@ -19,6 +19,7 @@
 package org.apache.cassandra.service;
 
 import java.io.IOException;
+import java.net.UnknownHostException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -191,4 +192,7 @@ public interface StorageServiceMBean
      * @throws UnavailableException if some of the hosts in the ring are down.
      */
     public void truncate(String keyspace, String columnFamily) throws UnavailableException,
TimeoutException, IOException;
+
+    /** force hint delivery to an endpoint **/
+    public void deliverHints(String host) throws UnknownHostException;
 }



Mime
View raw message