cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gdusba...@apache.org
Subject svn commit: r948531 - in /cassandra/branches/cassandra-0.6: CHANGES.txt src/java/org/apache/cassandra/net/FileStreamTask.java
Date Wed, 26 May 2010 18:40:29 GMT
Author: gdusbabek
Date: Wed May 26 18:40:29 2010
New Revision: 948531

URL: http://svn.apache.org/viewvc?rev=948531&view=rev
Log:
retry streaming connections up to 8 times. patch by stuhood, reviewed by gdusbabek. CASSANDRA-1019

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/FileStreamTask.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=948531&r1=948530&r2=948531&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Wed May 26 18:40:29 2010
@@ -1,3 +1,6 @@
+0.6.3
+ * retry to make streaming connections up to 8 times. (CASSANDRA-1019)
+
 0.6.2
  * fix contrib/word_count build. (CASSANDRA-992)
  * split CommitLogExecutorService into BatchCommitLogExecutorService and 

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/FileStreamTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=948531&r1=948530&r2=948531&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/FileStreamTask.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/FileStreamTask.java
Wed May 26 18:40:29 2010
@@ -37,6 +37,8 @@ public class FileStreamTask extends Wrap
     private static Logger logger = Logger.getLogger( FileStreamTask.class );
     
     public static final int CHUNK_SIZE = 32*1024*1024;
+    // around 10 minutes at the default rpctimeout
+    public static final int MAX_CONNECT_ATTEMPTS = 8;
 
     private final String file;
     private final long startPosition;
@@ -53,11 +55,10 @@ public class FileStreamTask extends Wrap
     
     public void runMayThrow() throws IOException
     {
-        SocketChannel channel = SocketChannel.open();
-        // force local binding on correctly specified interface.
-        channel.socket().bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0));
-        // obey the unwritten law that all nodes on a cluster must use the same storage port.
-        channel.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort()));
+        SocketChannel channel = connect();
+
+        // successfully connected: stream.
+        // (at this point, if we fail, it is the receiver's job to re-request)
         try
         {
             stream(channel);
@@ -112,4 +113,41 @@ public class FileStreamTask extends Wrap
         }
     }
 
+    /**
+     * Connects to the destination, with backoff for failed attempts.
+     * TODO: all nodes on a cluster must currently use the same storage port
+     * @throws IOException If all attempts fail.
+     */
+    private SocketChannel connect() throws IOException
+    {
+        SocketChannel channel = SocketChannel.open();
+        // force local binding on correctly specified interface.
+        channel.socket().bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0));
+        int attempts = 0;
+        while (true)
+        {
+            try
+            {
+                channel.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort()));
+                // success
+                return channel;
+            }
+            catch (IOException e)
+            {
+                if (++attempts >= MAX_CONNECT_ATTEMPTS)
+                    throw e;
+
+                long waitms = DatabaseDescriptor.getRpcTimeout() * (long)Math.pow(2, attempts);
+                logger.warn("Failed attempt " + attempts + " to connect to " + to + " to
stream " + file + ". Retrying in " + waitms + " ms. (" + e + ")");
+                try
+                {
+                    Thread.sleep(waitms);
+                }
+                catch (InterruptedException wtf)
+                {
+                    throw new RuntimeException(wtf);
+                }
+            }
+        }
+    }
 }



Mime
View raw message