cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1001532 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/streaming/ test/unit/org/apache/cassandra/service/
Date Sun, 26 Sep 2010 21:53:50 GMT
Author: jbellis
Date: Sun Sep 26 21:53:50 2010
New Revision: 1001532

URL: http://svn.apache.org/viewvc?rev=1001532&view=rev
Log:
add repair callbacks to track session completion.
patch by Stu Hood; reviewed by jbellis for CASSANDRA-1511

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1001532&r1=1001531&r2=1001532&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Sun Sep 26 21:53:50 2010
@@ -94,6 +94,7 @@
    saved property, instead of vice versa (CASSANDRA-1447)
  * JMX MessagingService pending and completed counts (CASSANDRA-1533)
  * fix race condition processing repair responses (CASSANDRA-1511)
+ * make repair blocking (CASSANDRA-1511)
 
 
 0.7-beta1

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1001532&r1=1001531&r2=1001532&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Sun Sep
26 21:53:50 2010
@@ -24,6 +24,7 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Objects;
 import org.slf4j.Logger;
@@ -103,7 +104,7 @@ public class AntiEntropyService
     /**
      * A map of repair session ids to a Queue of TreeRequests that have been performed since
the session was started.
      */
-    private final ConcurrentMap<String, BlockingQueue<TreeRequest>> sessions;
+    private final ConcurrentMap<String, RepairSession.Callback> sessions;
 
     /**
      * Protected constructor. Use AntiEntropyService.instance.
@@ -111,7 +112,7 @@ public class AntiEntropyService
     protected AntiEntropyService()
     {
         requests = new ExpiringMap<String, Map<TreeRequest, TreePair>>(REQUEST_TIMEOUT);
-        sessions = new ConcurrentHashMap<String, BlockingQueue<TreeRequest>>();
+        sessions = new ConcurrentHashMap<String, RepairSession.Callback>();
     }
 
     /**
@@ -129,11 +130,7 @@ public class AntiEntropyService
     void completedRequest(TreeRequest request)
     {
         // indicate to the waiting session that this request completed
-        BlockingQueue<TreeRequest> session = sessions.get(request.sessionid);
-        if (session == null)
-            // repair client disconnected: ignore
-            return;
-        session.offer(request);
+        sessions.get(request.sessionid).completed(request);
     }
 
     /**
@@ -429,7 +426,7 @@ public class AntiEntropyService
     }
 
     /**
-     * Runs on the node that initiated a request to compares two trees, and launch repairs
for disagreeing ranges.
+     * Runs on the node that initiated a request to compare two trees, and launch repairs
for disagreeing ranges.
      */
     public static class Differencer implements Runnable
     {
@@ -479,24 +476,24 @@ public class AntiEntropyService
             
             // choose a repair method based on the significance of the difference
             float difference = differenceFraction();
+            String format = "Endpoints " + local + " and " + request.endpoint + " are %s
for " + request.cf;
             if (difference == 0.0)
             {
-                logger.info("Endpoints " + local + " and " + request.endpoint + " are consistent
for " + request.cf);
+                logger.info(String.format(format, "consistent"));
+                AntiEntropyService.instance.completedRequest(request);
+                return;
             }
-            else
+
+            // non-0 difference: perform streaming repair
+            logger.info(String.format(format, (difference * 100) + "% out of sync"));
+            try
             {
-                try
-                {
-                    performStreamingRepair();
-                }
-                catch(IOException e)
-                {
-                    throw new RuntimeException(e);
-                }
+                performStreamingRepair();
+            }
+            catch(IOException e)
+            {
+                throw new RuntimeException(e);
             }
-
-            // repair was completed successfully: notify any waiting sessions
-            AntiEntropyService.instance.completedRequest(request);
         }
         
         /**
@@ -512,8 +509,8 @@ public class AntiEntropyService
         }
 
         /**
-         * Sends our list of differences to the remote endpoint using the
-         * Streaming API.
+         * Starts sending/receiving our list of differences to/from the remote endpoint:
creates a callback
+         * that will be called out of band once the streams complete.
          */
         void performStreamingRepair() throws IOException
         {
@@ -521,36 +518,46 @@ public class AntiEntropyService
             ColumnFamilyStore cfstore = Table.open(request.cf.left).getColumnFamilyStore(request.cf.right);
             try
             {
-                final List<Range> ranges = new ArrayList<Range>(differences);
-                final Collection<SSTableReader> sstables = cfstore.getSSTables();
+                List<Range> ranges = new ArrayList<Range>(differences);
+                Collection<SSTableReader> sstables = cfstore.getSSTables();
+                Callback callback = new Callback();
                 // send ranges to the remote node
-                Future f = StageManager.getStage(Stage.STREAM).submit(new WrappedRunnable()
-                {
-                    protected void runMayThrow() throws Exception
-                    {
-                        StreamOutSession session = StreamOutSession.create(request.cf.left,
request.endpoint, null);
-                        StreamOut.transferSSTables(session, sstables, ranges);
-                    }
-                });
+                StreamOutSession outsession = StreamOutSession.create(request.cf.left, request.endpoint,
callback);
+                StreamOut.transferSSTables(outsession, sstables, ranges);
                 // request ranges from the remote node
-                // FIXME: no way to block for the 'requestRanges' call to complete, or to
request a
-                // particular cf: see CASSANDRA-1189
-                StreamIn.requestRanges(request.endpoint, request.cf.left, ranges);
-                
-                // wait until streaming has completed
-                f.get();
+                StreamIn.requestRanges(request.endpoint, request.cf.left, ranges, callback);
             }
             catch(Exception e)
             {
                 throw new IOException("Streaming repair failed.", e);
             }
-            logger.info("Finished streaming repair for " + request);
         }
 
         public String toString()
         {
             return "#<Differencer " + request + ">";
         }
+
+        /**
+         * When a repair is necessary, this callback is created to wait for the inbound
+         * and outbound streams to complete.
+         */
+        class Callback extends WrappedRunnable
+        {
+            // we expect one callback for the receive, and one for the send
+            private final AtomicInteger outstanding = new AtomicInteger(2);
+
+            protected void runMayThrow() throws Exception
+            {
+                if (outstanding.decrementAndGet() > 0)
+                    // waiting on more calls
+                    return;
+
+                // all calls finished successfully
+                logger.info("Finished streaming repair for " + request);
+                AntiEntropyService.instance.completedRequest(request);
+            }
+        }
     }
 
     /**
@@ -743,18 +750,21 @@ public class AntiEntropyService
 
     /**
      * Triggers repairs with all neighbors for the given table and cfs. Typical lifecycle
is: start() then join().
+     * Executed in client threads.
      */
     class RepairSession extends Thread
     {
         private final String tablename;
         private final String[] cfnames;
         private final SimpleCondition requestsMade;
+        private final ConcurrentHashMap<TreeRequest,Object> requests;
         public RepairSession(String tablename, String... cfnames)
         {
             super("manual-repair-" + UUID.randomUUID());
             this.tablename = tablename;
             this.cfnames = cfnames;
             this.requestsMade = new SimpleCondition();
+            this.requests = new ConcurrentHashMap<TreeRequest,Object>();
         }
 
         /**
@@ -769,39 +779,60 @@ public class AntiEntropyService
         public void run()
         {
             // begin a repair session
-            BlockingQueue<TreeRequest> completed = new LinkedBlockingQueue<TreeRequest>();
-            AntiEntropyService.this.sessions.put(getName(), completed);
+            Callback callback = new Callback();
+            AntiEntropyService.this.sessions.put(getName(), callback);
             try
             {
                 // request that all relevant endpoints generate trees
-                Set<TreeRequest> requests = new HashSet<TreeRequest>();
                 Set<InetAddress> endpoints = AntiEntropyService.getNeighbors(tablename);
                 for (String cfname : cfnames)
                 {
                     // send requests to remote nodes and record them
                     for (InetAddress endpoint : endpoints)
-                        requests.add(AntiEntropyService.this.request(getName(), endpoint,
tablename, cfname));
+                        requests.put(AntiEntropyService.this.request(getName(), endpoint,
tablename, cfname), this);
                     // send but don't record an outstanding request to the local node
                     AntiEntropyService.this.request(getName(), FBUtilities.getLocalAddress(),
tablename, cfname);
                 }
+                logger.info("Waiting for repair requests: " + requests.keySet());
                 requestsMade.signalAll();
 
-                // block until all requests have been returned by completedRequest calls
-                logger.info("Waiting for repair requests to: " + requests);
-                while (!requests.isEmpty())
-                {
-                    TreeRequest request = completed.take();
-                    logger.info("Repair request to " + request + " completed successfully.");
-                    requests.remove(request);
-                }
+                // block whatever thread started this session until all requests have been
returned:
+                // if this thread dies, the session will still complete in the background
+                callback.completed.await();
             }
             catch (InterruptedException e)
             {
                 throw new RuntimeException("Interrupted while waiting for repair: repair
will continue in the background.");
             }
-            finally
+        }
+
+        /**
+         * Receives notifications of completed requests, and sets a condition when all requests
+         * triggered by this session have completed.
+         */
+        class Callback
+        {
+            public final SimpleCondition completed = new SimpleCondition();
+            public void completed(TreeRequest request)
             {
+                // don't mark any requests completed until all requests have been made
+                try
+                {
+                    blockUntilRunning();
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
+                }
+                requests.remove(request);
+                logger.info("{} completed successfully: {} outstanding.", request, requests.size());
+                if (!requests.isEmpty())
+                    return;
+
+                // all requests completed
+                logger.info("Session " + getName() + " completed successfully.");
                 AntiEntropyService.this.sessions.remove(getName());
+                completed.signalAll();
             }
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=1001532&r1=1001531&r2=1001532&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java Sun Sep 26 21:53:50
2010
@@ -27,6 +27,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.lang.StringUtils;
+
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.utils.Pair;
@@ -80,7 +82,7 @@ public class PendingFile
 
     public String toString()
     {
-        return getFilename() + "/" + sections;
+        return getFilename() + "/" + StringUtils.join(sections, ",");
     }
 
     public static class PendingFileSerializer implements ICompactSerializer<PendingFile>

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=1001532&r1=1001531&r2=1001532&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Sun Sep 26 21:53:50
2010
@@ -26,9 +26,9 @@ import java.io.IOError;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.db.Table;
@@ -72,7 +72,8 @@ public class StreamOut
         // this is so that this target shows up as a destination while anticompaction is
happening.
         StreamOutSession session = StreamOutSession.create(tableName, target, callback);
 
-        logger.info("Beginning transfer process to {} for ranges {}", target, StringUtils.join(ranges,
", "));
+        logger.info("Beginning transfer to {}", target);
+        logger.debug("Ranges are {}", StringUtils.join(ranges, ","));
 
         try
         {
@@ -120,7 +121,8 @@ public class StreamOut
     {
         assert ranges.size() > 0;
 
-        logger.info("Beginning transfer process to {} for ranges {}", session.getHost(),
StringUtils.join(ranges, ", "));
+        logger.info("Beginning transfer to {}", session.getHost());
+        logger.debug("Ranges are {}", StringUtils.join(ranges, ","));
 
         try
         {

Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java?rev=1001532&r1=1001531&r2=1001532&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTest.java Sun
Sep 26 21:53:50 2010
@@ -125,9 +125,6 @@ public class AntiEntropyServiceTest exte
         // confirm that the tree was validated
         Token min = validator.tree.partitioner().getMinimumToken();
         assert null != validator.tree.hash(new Range(min, min));
-
-        // wait for queued operations to be flushed
-        flushAES();
     }
 
     @Test
@@ -174,7 +171,7 @@ public class AntiEntropyServiceTest exte
     public void testGetNeighborsPlusOne() throws Throwable
     {
         // generate rf+1 nodes, and ensure that all nodes are returned
-        Set<InetAddress> expected = addTokens(1 + 1 + DatabaseDescriptor.getReplicationFactor(tablename));
+        Set<InetAddress> expected = addTokens(1 + DatabaseDescriptor.getReplicationFactor(tablename));
         expected.remove(FBUtilities.getLocalAddress());
         assertEquals(expected, AntiEntropyService.getNeighbors(tablename));
     }
@@ -185,7 +182,7 @@ public class AntiEntropyServiceTest exte
         TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 
         // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned
-        addTokens(1 + (2 * DatabaseDescriptor.getReplicationFactor(tablename)));
+        addTokens(2 * DatabaseDescriptor.getReplicationFactor(tablename));
         AbstractReplicationStrategy ars = StorageService.instance.getReplicationStrategy(tablename);
         Set<InetAddress> expected = new HashSet<InetAddress>();
         for (Range replicaRange : ars.getAddressRanges().get(FBUtilities.getLocalAddress()))
@@ -230,7 +227,7 @@ public class AntiEntropyServiceTest exte
     {
         TokenMetadata tmd = StorageService.instance.getTokenMetadata();
         Set<InetAddress> endpoints = new HashSet<InetAddress>();
-        for (int i = 1; i < max; i++)
+        for (int i = 1; i <= max; i++)
         {
             InetAddress endpoint = InetAddress.getByName("127.0.0." + i);
             tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(), endpoint);



Mime
View raw message