cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject svn commit: r1163685 - in /cassandra/trunk: ./ contrib/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/service/ test/unit/org/apache/cassand...
Date Wed, 31 Aug 2011 16:24:13 GMT
Author: slebresne
Date: Wed Aug 31 16:24:13 2011
New Revision: 1163685

URL: http://svn.apache.org/viewvc?rev=1163685&view=rev
Log:
merge from 0.8

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java 
 (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props
changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
  (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
    cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java
    cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 31 16:24:13 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1131291
 /cassandra/branches/cassandra-0.7:1026516-1163266
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
-/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1163272
+/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1163272,1163677
 /cassandra/branches/cassandra-0.8.0:1125021-1130369
 /cassandra/branches/cassandra-0.8.1:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Aug 31 16:24:13 2011
@@ -96,6 +96,8 @@
  * fix ip address String representation in the ring cache (CASSANDRA-3044)
  * fix ring cache compatibility when mixing pre-0.8.4 nodes with post-
    in the same cluster (CASSANDRA-3023)
+ * make repair report failure when a node participating dies (instead of
+   hanging forever) (CASSANDRA-2433)
 
 0.8.4
  * include files-to-be-streamed in StreamInSession.getSources (CASSANDRA-2972)

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 31 16:24:13 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
 /cassandra/branches/cassandra-0.7/contrib:1026516-1163266
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
-/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1163272
+/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1163272,1163677
 /cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 31 16:24:13 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1163266
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1163272
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1163272,1163677
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 31 16:24:13 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1163266
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1163272
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1163272,1163677
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 31 16:24:13 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1163266
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1163272
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1163272,1163677
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 31 16:24:13 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1163266
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1163272
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1163272,1163677
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Aug 31 16:24:13 2011
@@ -1,7 +1,7 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1131291
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1163266
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1163272
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1163272,1163677
 /cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
 /cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java Wed Aug 31 16:24:13
2011
@@ -180,7 +180,7 @@ public class FailureDetector implements 
         {     
             for ( IFailureDetectionEventListener listener : fdEvntListeners_ )
             {
-                listener.convict(ep);
+                listener.convict(ep, phi);
             }
         }        
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Wed Aug 31 16:24:13 2011
@@ -262,7 +262,7 @@ public class Gossiper implements IFailur
      *
      * param @ endpoint end point that is convicted.
     */
-    public void convict(InetAddress endpoint)
+    public void convict(InetAddress endpoint, double phi)
     {
         EndpointState epState = endpointStateMap.get(endpoint);
         if (epState.isAlive())
@@ -735,12 +735,11 @@ public class Gossiper implements IFailur
         if (logger.isTraceEnabled())
             logger.trace("Adding endpoint state for " + ep);
         endpointStateMap.put(ep, epState);
-        if (epState.isAlive())
-        {
-            // the node restarted before we ever marked it down, so we'll report it as dead
briefly so maintenance like resetting the connection pool can occur 
-            for (IEndpointStateChangeSubscriber subscriber : subscribers)
-                subscriber.onDead(ep, epState);
-        }
+
+        // the node restarted: it is up to the subscriber to take whatever action is necessary
+        for (IEndpointStateChangeSubscriber subscriber : subscribers)
+            subscriber.onRestart(ep, epState);
+
         if (epState.getApplicationState(ApplicationState.STATUS) != null && !isDeadState(epState.getApplicationState(ApplicationState.STATUS).value))
             markAlive(ep, epState);
         else

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/IEndpointStateChangeSubscriber.java
Wed Aug 31 16:24:13 2011
@@ -47,4 +47,12 @@ public interface IEndpointStateChangeSub
     public void onDead(InetAddress endpoint, EndpointState state);
 
     public void onRemove(InetAddress endpoint);
+
+    /**
+     * Called whenever a node is restarted.
+     * Note that there is no guarantee when that happens that the node was
+     * previously marked down. It will have only if {@code state.isAlive() == false}
+     * as {@code state} is from before the restarted node is marked up.
+     */
+    public void onRestart(InetAddress endpoint, EndpointState state);
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetectionEventListener.java
Wed Aug 31 16:24:13 2011
@@ -31,7 +31,7 @@ public interface IFailureDetectionEventL
     /**
      * Convict the specified endpoint.
      * @param ep endpoint to be convicted
+     * @param phi the value of phi with with ep was convicted
      */
-    public void convict(InetAddress ep);
-    
+    public void convict(InetAddress ep, double phi);
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/Ec2MultiRegionSnitch.java Wed Aug
31 16:24:13 2011
@@ -73,6 +73,12 @@ public class Ec2MultiRegionSnitch extend
     }
 
     @Override
+    public void onRestart(InetAddress endpoint, EndpointState state)
+    {
+        // do nothing
+    }
+
+    @Override
     public void onRemove(InetAddress endpoint)
     {
         // do nothing.

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Wed Aug
31 16:24:13 2011
@@ -23,27 +23,26 @@ import java.net.InetAddress;
 import java.security.MessageDigest;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 
 import com.google.common.base.Objects;
 
-import org.apache.cassandra.db.compaction.AbstractCompactedRow;
-import org.apache.cassandra.gms.Gossiper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
@@ -97,19 +96,19 @@ public class AntiEntropyService
     // singleton enforcement
     public static final AntiEntropyService instance = new AntiEntropyService();
 
-    // timeout for outstanding requests (48 hours)
-    public final static long REQUEST_TIMEOUT = 48*60*60*1000;
-
-    /**
-     * Map of outstanding sessions to requests. Once both trees reach the rendezvous, the
local node
-     * will queue a Differencer to compare them.
-     *
-     * This map is only accessed from Stage.ANTIENTROPY, so it is not synchronized.
-     */
-    private final ExpiringMap<String, Map<TreeRequest, TreePair>> requests;
+    private static final ThreadPoolExecutor executor;
+    static
+    {
+        executor = new JMXConfigurableThreadPoolExecutor(4,
+                                                         60,
+                                                         TimeUnit.SECONDS,
+                                                         new LinkedBlockingQueue<Runnable>(),
+                                                         new NamedThreadFactory("AntiEntropySessions"),
+                                                         "internal");
+    }
 
     /**
-     * A map of repair session ids to a Queue of TreeRequests that have been performed since
the session was started.
+     * A map of active session.
      */
     private final ConcurrentMap<String, RepairSession> sessions;
 
@@ -118,22 +117,24 @@ public class AntiEntropyService
      */
     protected AntiEntropyService()
     {
-        requests = new ExpiringMap<String, Map<TreeRequest, TreePair>>(REQUEST_TIMEOUT);
         sessions = new ConcurrentHashMap<String, RepairSession>();
     }
 
     /**
      * Requests repairs for the given table and column families, and blocks until all repairs
have been completed.
-     * TODO: Should add retries: if nodes go offline before they respond to the requests,
this could block forever.
      */
-    public RepairSession getRepairSession(Range range, String tablename, String... cfnames)
+    public RepairFuture submitRepairSession(Range range, String tablename, String... cfnames)
     {
-        return new RepairSession(range, tablename, cfnames);
+        RepairFuture futureTask = new RepairSession(range, tablename, cfnames).getFuture();
+        executor.execute(futureTask);
+        return futureTask;
     }
 
-    RepairSession getArtificialRepairSession(TreeRequest req, String tablename, String...
cfnames)
+    RepairFuture submitArtificialRepairSession(TreeRequest req, String tablename, String...
cfnames)
     {
-        return new RepairSession(req, tablename, cfnames);
+        RepairFuture futureTask = new RepairSession(req, tablename, cfnames).getFuture();
+        executor.execute(futureTask);
+        return futureTask;
     }
 
     /**
@@ -588,24 +589,25 @@ public class AntiEntropyService
      * Triggers repairs with all neighbors for the given table, cfs and range.
      * Typical lifecycle is: start() then join(). Executed in client threads.
      */
-    class RepairSession extends Thread
+    class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber,
IFailureDetectionEventListener
     {
+        private final String sessionName;
         private final String tablename;
         private final String[] cfnames;
-        private final ConcurrentHashMap<TreeRequest,Object> requests = new ConcurrentHashMap<TreeRequest,Object>();
         private final Range range;
-        private final Set<InetAddress> endpoints;
+        private volatile Exception exception;
+        private final AtomicBoolean isFailed = new AtomicBoolean(false);
 
-        private CountDownLatch completedLatch;
+        private final Set<InetAddress> endpoints;
         final Queue<RepairJob> jobs = new ConcurrentLinkedQueue<RepairJob>();
+        final Map<String, RepairJob> activeJobs = new ConcurrentHashMap<String,
RepairJob>();
 
+        private final SimpleCondition completed = new SimpleCondition();
         public final Condition differencingDone = new SimpleCondition();
 
         public RepairSession(TreeRequest req, String tablename, String... cfnames)
         {
             this(req.sessionid, req.range, tablename, cfnames);
-            requests.put(req, this);
-            completedLatch = new CountDownLatch(cfnames.length);
             AntiEntropyService.instance.sessions.put(getName(), this);
         }
 
@@ -616,7 +618,7 @@ public class AntiEntropyService
 
         private RepairSession(String id, Range range, String tablename, String[] cfnames)
         {
-            super(id);
+            this.sessionName = id;
             this.tablename = tablename;
             this.cfnames = cfnames;
             assert cfnames.length > 0 : "Repairing no column families seems pointless,
doesn't it";
@@ -624,8 +626,18 @@ public class AntiEntropyService
             this.endpoints = AntiEntropyService.getNeighbors(tablename, range);
         }
 
-        @Override
-        public void run()
+        public String getName()
+        {
+            return sessionName;
+        }
+
+        RepairFuture getFuture()
+        {
+            return new RepairFuture(this);
+        }
+
+        // we don't care about the return value but care about it throwing exception
+        public void runMayThrow() throws Exception
         {
             if (endpoints.isEmpty())
             {
@@ -646,20 +658,25 @@ public class AntiEntropyService
             }
 
             AntiEntropyService.instance.sessions.put(getName(), this);
+            Gossiper.instance.register(this);
+            FailureDetector.instance.registerFailureDetectionEventListener(this);
             try
             {
                 // Create and queue a RepairJob for each column family
                 for (String cfname : cfnames)
-                    jobs.offer(new RepairJob(cfname));
-
-                // We'll repair once by endpoints and column family
-                completedLatch = new CountDownLatch(endpoints.size() * cfnames.length);
+                {
+                    RepairJob job = new RepairJob(cfname);
+                    jobs.offer(job);
+                    activeJobs.put(cfname, job);
+                }
 
                 jobs.peek().sendTreeRequests();
 
                 // block whatever thread started this session until all requests have been
returned:
                 // if this thread dies, the session will still complete in the background
-                completedLatch.await();
+                completed.await();
+                if (exception != null)
+                    throw exception;
             }
             catch (InterruptedException e)
             {
@@ -667,6 +684,8 @@ public class AntiEntropyService
             }
             finally
             {
+                FailureDetector.instance.unregisterFailureDetectionEventListener(this);
+                Gossiper.instance.unregister(this);
                 AntiEntropyService.instance.sessions.remove(getName());
             }
         }
@@ -674,20 +693,69 @@ public class AntiEntropyService
         void completed(InetAddress remote, String cfname)
         {
             logger.debug("Repair completed for {} on {}", remote, cfname);
-            completedLatch.countDown();
+            RepairJob job = activeJobs.get(cfname);
+            if (job.completedSynchronizationJob(remote))
+            {
+                activeJobs.remove(cfname);
+                if (activeJobs.isEmpty())
+                    completed.signalAll();
+            }
+        }
+
+        void failedNode(InetAddress remote)
+        {
+            String errorMsg = String.format("Problem during repair session %s, endpoint %s
died", sessionName, remote);
+            logger.error(errorMsg);
+            exception = new IOException(errorMsg);
+            // If a node failed, we stop everything (though there could still be some activity
in the background)
+            jobs.clear();
+            activeJobs.clear();
+            differencingDone.signalAll();
+            completed.signalAll();
+        }
+
+        public void onJoin(InetAddress endpoint, EndpointState epState) {}
+        public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue
value) {}
+        public void onAlive(InetAddress endpoint, EndpointState state) {}
+        public void onDead(InetAddress endpoint, EndpointState state) {}
+
+        public void onRemove(InetAddress endpoint)
+        {
+            convict(endpoint, Double.MAX_VALUE);
+        }
+
+        public void onRestart(InetAddress endpoint, EndpointState epState)
+        {
+            convict(endpoint, Double.MAX_VALUE);
+        }
+
+        public void convict(InetAddress endpoint, double phi)
+        {
+            if (!endpoints.contains(endpoint))
+                return;
+
+            // We want a higher confidence in the failure detection than usual because failing
a repair wrongly has a high cost.
+            if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+                return;
+
+            // Though unlikely, it is possible to arrive here multiple time and we
+            // want to avoid print an error message twice
+            if (!isFailed.compareAndSet(false, true))
+                return;
+
+            failedNode(endpoint);
         }
 
         class RepairJob
         {
             private final String cfname;
-            private final AtomicInteger remaining;
-            private final Map<InetAddress, MerkleTree> trees;
+            private final Set<InetAddress> requestedEndpoints = new HashSet<InetAddress>();
+            private final Map<InetAddress, MerkleTree> trees = new HashMap<InetAddress,
MerkleTree>();
+            private final Set<InetAddress> syncJobs = new HashSet<InetAddress>();
 
             public RepairJob(String cfname)
             {
                 this.cfname = cfname;
-                this.remaining = new AtomicInteger(endpoints.size() + 1); // all neighbor
+ local host
-                this.trees = new ConcurrentHashMap<InetAddress, MerkleTree>();
             }
 
             /**
@@ -695,22 +763,24 @@ public class AntiEntropyService
              */
             public void sendTreeRequests()
             {
-                // send requests to remote nodes and record them
-                for (InetAddress endpoint : endpoints)
-                    requests.put(AntiEntropyService.instance.request(getName(), endpoint,
range, tablename, cfname), RepairSession.this);
-                // send but don't record an outstanding request to the local node
-                AntiEntropyService.instance.request(getName(), FBUtilities.getBroadcastAddress(),
range, tablename, cfname);
+                requestedEndpoints.addAll(endpoints);
+                requestedEndpoints.add(FBUtilities.getBroadcastAddress());
+
+                // send requests to all nodes
+                for (InetAddress endpoint : requestedEndpoints)
+                    AntiEntropyService.instance.request(getName(), endpoint, range, tablename,
cfname);
             }
 
             /**
              * Add a new received tree and return the number of remaining tree to
              * be received for the job to be complete.
              */
-            public int addTree(TreeRequest request, MerkleTree tree)
+            public synchronized int addTree(TreeRequest request, MerkleTree tree)
             {
                 assert request.cf.right.equals(cfname);
                 trees.put(request.endpoint, tree);
-                return remaining.decrementAndGet();
+                requestedEndpoints.remove(request.endpoint);
+                return requestedEndpoints.size();
             }
 
             /**
@@ -719,7 +789,7 @@ public class AntiEntropyService
              */
             public void submitDifferencers()
             {
-                assert remaining.get() == 0;
+                assert requestedEndpoints.size() == 0;
 
                 // Right now, we only difference local host against each other. CASSANDRA-2610
will fix that.
                 // In the meantime ugly special casing will work good enough.
@@ -732,9 +802,17 @@ public class AntiEntropyService
                         continue;
 
                     Differencer differencer = new Differencer(cfname, entry.getKey(), entry.getValue(),
localTree);
+                    syncJobs.add(entry.getKey());
                     logger.debug("Queueing comparison " + differencer);
                     StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer);
                 }
+                trees.clear(); // allows gc to do its thing
+            }
+
+            synchronized boolean completedSynchronizationJob(InetAddress remote)
+            {
+                syncJobs.remove(remote);
+                return syncJobs.isEmpty();
             }
         }
 
@@ -841,11 +919,21 @@ public class AntiEntropyService
                         return;
 
                     // all calls finished successfully
-                    //
                     completed(remote, cfname);
-                    logger.info(String.format("Finished streaming repair with %s for %s:
%d oustanding to complete session", remote, range, completedLatch.getCount()));
+                    logger.info(String.format("Finished streaming repair with %s for %s",
remote, range));
                 }
             }
         }
     }
+
+    public static class RepairFuture extends FutureTask
+    {
+        public final RepairSession session;
+
+        RepairFuture(RepairSession session)
+        {
+            super(session, null);
+            this.session = session;
+        }
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/LoadBroadcaster.java Wed Aug 31
16:24:13 2011
@@ -62,6 +62,8 @@ public class LoadBroadcaster implements 
 
     public void onDead(InetAddress endpoint, EndpointState state) {}
 
+    public void onRestart(InetAddress endpoint, EndpointState state) {}
+
     public void onRemove(InetAddress endpoint) {}
 
     public Map<InetAddress, Double> getLoadInfo()

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java Wed Aug 31
16:24:13 2011
@@ -78,6 +78,8 @@ public class MigrationManager implements
 
     public void onDead(InetAddress endpoint, EndpointState state) { }
 
+    public void onRestart(InetAddress endpoint, EndpointState state) { }
+
     public void onRemove(InetAddress endpoint) { }
     
     /** 

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Aug 31 16:24:13
2011
@@ -1268,6 +1268,13 @@ public class StorageService implements I
         MessagingService.instance().convict(endpoint);
     }
 
+    public void onRestart(InetAddress endpoint, EndpointState state)
+    {
+        // If we have restarted before the node was even marked down, we need to reset the
connection pool
+        if (state.isAlive())
+            onDead(endpoint, state);
+    }
+
     /** raw load value */
     public double getLoad()
     {
@@ -1564,44 +1571,43 @@ public class StorageService implements I
         {
             return;
         }
-        
-        List<AntiEntropyService.RepairSession> sessions = new ArrayList<AntiEntropyService.RepairSession>();
+        List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>();
         for (Range range : getLocalRanges(tableName))
         {
-            AntiEntropyService.RepairSession session = forceTableRepair(range, tableName,
columnFamilies);
-            sessions.add(session);
+            AntiEntropyService.RepairFuture future = forceTableRepair(range, tableName, columnFamilies);
+            futures.add(future);
             // wait for a session to be done with its differencing before starting the next
one
             try
             {
-                session.differencingDone.await();
+                future.session.differencingDone.await();
             }
             catch (InterruptedException e)
             {
-                logger_.error("Interrupted while waiting for the differencing of repair session
" + session + " to be done. Repair may be imprecise.", e);
+                logger_.error("Interrupted while waiting for the differencing of repair session
" + future.session + " to be done. Repair may be imprecise.", e);
             }
         }
 
         boolean failedSession = false;
 
         // block until all repair sessions have completed
-        for (AntiEntropyService.RepairSession sess : sessions)
+        for (AntiEntropyService.RepairFuture future : futures)
         {
             try
             {
-                sess.join();
+                future.get();
             }
-            catch (InterruptedException e)
+            catch (Exception e)
             {
-                logger_.error("Repair session " + sess + " failed.", e);
+                logger_.error("Repair session " + future.session + " failed.", e);
                 failedSession = true;
             }
         }
 
         if (failedSession)
-            throw new IOException("Some Repair session(s) failed.");
+            throw new IOException("Some repair session(s) failed (see log for details).");
     }
 
-    public AntiEntropyService.RepairSession forceTableRepair(final Range range, final String
tableName, final String... columnFamilies) throws IOException
+    public AntiEntropyService.RepairFuture forceTableRepair(final Range range, final String
tableName, final String... columnFamilies) throws IOException
     {
         ArrayList<String> names = new ArrayList<String>();
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
@@ -1609,9 +1615,7 @@ public class StorageService implements I
             names.add(cfStore.getColumnFamilyName());
         }
 
-        AntiEntropyService.RepairSession sess = AntiEntropyService.instance.getRepairSession(range,
tableName, names.toArray(new String[names.size()]));
-        sess.start();
-        return sess;
+        return AntiEntropyService.instance.submitRepairSession(range, tableName, names.toArray(new
String[names.size()]));
     }
 
     /* End of MBean interface methods */

Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java?rev=1163685&r1=1163684&r2=1163685&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
Wed Aug 31 16:24:13 2011
@@ -20,9 +20,7 @@ package org.apache.cassandra.service;
 
 import java.net.InetAddress;
 import java.util.*;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.*;
 
 import org.apache.cassandra.config.Schema;
 import org.junit.After;
@@ -47,6 +45,7 @@ import org.apache.cassandra.utils.Merkle
 import static org.apache.cassandra.service.AntiEntropyService.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public abstract class AntiEntropyServiceTestAbstract extends CleanupHelper
@@ -162,18 +161,21 @@ public abstract class AntiEntropyService
     @Test
     public void testManualRepair() throws Throwable
     {
-        AntiEntropyService.RepairSession sess = AntiEntropyService.instance.getRepairSession(local_range,
tablename, cfname);
-        sess.start();
+        RepairFuture sess = AntiEntropyService.instance.submitRepairSession(local_range,
tablename, cfname);
 
         // ensure that the session doesn't end without a response from REMOTE
-        sess.join(500);
-        assert sess.isAlive();
+        try
+        {
+            sess.get(500, TimeUnit.MILLISECONDS);
+            fail("Repair session should not end without response from REMOTE");
+        }
+        catch (TimeoutException e) {}
 
         // deliver a fake response from REMOTE
-        sess.completed(REMOTE, request.cf.right);
+        sess.session.completed(REMOTE, request.cf.right);
 
         // block until the repair has completed
-        sess.join();
+        sess.get();
     }
 
     @Test
@@ -218,8 +220,8 @@ public abstract class AntiEntropyService
     public void testDifferencer() throws Throwable
     {
         // this next part does some housekeeping so that cleanup in the differencer doesn't
error out.
-        AntiEntropyService.RepairSession sess = AntiEntropyService.instance.getArtificialRepairSession(request,
tablename, cfname);
-        
+        AntiEntropyService.RepairFuture sess = AntiEntropyService.instance.submitArtificialRepairSession(request,
tablename, cfname);
+
         // generate a tree
         Validator validator = new Validator(request);
         validator.prepare(store);
@@ -242,7 +244,7 @@ public abstract class AntiEntropyService
         interesting.add(changed);
 
         // difference the trees
-        AntiEntropyService.RepairSession.Differencer diff = sess.new Differencer(cfname,
request.endpoint, ltree, rtree);
+        AntiEntropyService.RepairSession.Differencer diff = sess.session.new Differencer(cfname,
request.endpoint, ltree, rtree);
         diff.run();
         
         // ensure that the changed range was recorded



Mime
View raw message