cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gdusba...@apache.org
Subject svn commit: r902981 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/net: FileStreamTask.java OutboundTcpConnection.java
Date Mon, 25 Jan 2010 21:26:29 GMT
Author: gdusbabek
Date: Mon Jan 25 21:26:28 2010
New Revision: 902981

URL: http://svn.apache.org/viewvc?rev=902981&view=rev
Log:
bind outgoing sockets to the locally specified cassandra interface (avoids using the result
of InetAddress.anyLocalAddress(), which may not be the right cassandra interface). Patch by
Gary Dusbabek, reviewed by Jonathan Ellis.

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=902981&r1=902980&r2=902981&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java Mon Jan
25 21:26:28 2010
@@ -25,6 +25,7 @@
 import java.nio.channels.FileChannel;
 import java.nio.channels.SocketChannel;
 
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -51,7 +52,11 @@
     
     public void runMayThrow() throws IOException
     {
-        SocketChannel channel = SocketChannel.open(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort()));
+        SocketChannel channel = SocketChannel.open();
+        // force local binding on correctly specified interface.
+        channel.socket().bind(new InetSocketAddress(FBUtilities.getLocalAddress(), 0));
+        // obey the unwritten law that all nodes on a cluster must use the same storage port.
+        channel.connect(new InetSocketAddress(to, DatabaseDescriptor.getStoragePort()));
         try
         {
             stream(channel);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=902981&r1=902980&r2=902981&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
Mon Jan 25 21:26:28 2010
@@ -1,148 +1,150 @@
-package org.apache.cassandra.net;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-
-public class OutboundTcpConnection extends Thread
-{
-    private static final Logger logger = Logger.getLogger(OutboundTcpConnection.class);
-
-    private static final ByteBuffer CLOSE_SENTINEL = ByteBuffer.allocate(0);
-    private static final int OPEN_RETRY_DELAY = 100; // ms between retries
-
-    private final OutboundTcpConnectionPool pool;
-    private final InetAddress endpoint;
-    private final BlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>();
-    private DataOutputStream output;
-    private Socket socket;
-
-    public OutboundTcpConnection(final OutboundTcpConnectionPool pool, final InetAddress
remoteEp)
-    {
-        super("WRITE-" + remoteEp);
-        this.pool = pool;
-        this.endpoint = remoteEp;
-    }
-
-    public void write(ByteBuffer buffer)
-    {
-        try
-        {
-            queue.put(buffer);
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-    }
-
-    public void closeSocket()
-    {
-        queue.clear();
-        write(CLOSE_SENTINEL);
-    }
-
-    public void run()
-    {
-        while (true)
-        {
-            ByteBuffer bb = take();
-            if (bb == CLOSE_SENTINEL)
-            {
-                disconnect();
-                continue;
-            }
-            if (socket != null || connect())
-                writeConnected(bb);
-        }
-    }
-
-    private void writeConnected(ByteBuffer bb)
-    {
-        try
-        {
-            output.write(bb.array(), 0, bb.limit());
-            if (queue.peek() == null)
-            {
-                output.flush();
-            }
-        }
-        catch (IOException e)
-        {
-            logger.info("error writing to " + endpoint);
-            disconnect();
-        }
-    }
-
-    private void disconnect()
-    {
-        if (socket != null)
-        {
-            try
-            {
-                socket.close();
-            }
-            catch (IOException e)
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug("exception closing connection to " + endpoint, e);
-            }
-            output = null;
-            socket = null;
-        }
-    }
-
-    private ByteBuffer take()
-    {
-        ByteBuffer bb;
-        try
-        {
-            bb = queue.take();
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-        return bb;
-    }
-
-    private boolean connect()
-    {
-        if (logger.isDebugEnabled())
-            logger.debug("attempting to connect to " + endpoint);
-        long start = System.currentTimeMillis();
-        while (System.currentTimeMillis() < start + DatabaseDescriptor.getRpcTimeout())
-        {
-            try
-            {
-                socket = new Socket(endpoint, DatabaseDescriptor.getStoragePort());
-                socket.setTcpNoDelay(true);
-                output = new DataOutputStream(socket.getOutputStream());
-                return true;
-            }
-            catch (IOException e)
-            {
-                socket = null;
-                if (logger.isTraceEnabled())
-                    logger.trace("unable to connect to " + endpoint, e);
-                try
-                {
-                    Thread.sleep(OPEN_RETRY_DELAY);
-                }
-                catch (InterruptedException e1)
-                {
-                    throw new AssertionError(e1);
-                }
-            }
-        }
-        return false;
-    }
-}
+package org.apache.cassandra.net;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+public class OutboundTcpConnection extends Thread
+{
+    private static final Logger logger = Logger.getLogger(OutboundTcpConnection.class);
+
+    private static final ByteBuffer CLOSE_SENTINEL = ByteBuffer.allocate(0);
+    private static final int OPEN_RETRY_DELAY = 100; // ms between retries
+
+    private final OutboundTcpConnectionPool pool;
+    private final InetAddress endpoint;
+    private final BlockingQueue<ByteBuffer> queue = new LinkedBlockingQueue<ByteBuffer>();
+    private DataOutputStream output;
+    private Socket socket;
+
+    public OutboundTcpConnection(final OutboundTcpConnectionPool pool, final InetAddress
remoteEp)
+    {
+        super("WRITE-" + remoteEp);
+        this.pool = pool;
+        this.endpoint = remoteEp;
+    }
+
+    public void write(ByteBuffer buffer)
+    {
+        try
+        {
+            queue.put(buffer);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    public void closeSocket()
+    {
+        queue.clear();
+        write(CLOSE_SENTINEL);
+    }
+
+    public void run()
+    {
+        while (true)
+        {
+            ByteBuffer bb = take();
+            if (bb == CLOSE_SENTINEL)
+            {
+                disconnect();
+                continue;
+            }
+            if (socket != null || connect())
+                writeConnected(bb);
+        }
+    }
+
+    private void writeConnected(ByteBuffer bb)
+    {
+        try
+        {
+            output.write(bb.array(), 0, bb.limit());
+            if (queue.peek() == null)
+            {
+                output.flush();
+            }
+        }
+        catch (IOException e)
+        {
+            logger.info("error writing to " + endpoint);
+            disconnect();
+        }
+    }
+
+    private void disconnect()
+    {
+        if (socket != null)
+        {
+            try
+            {
+                socket.close();
+            }
+            catch (IOException e)
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("exception closing connection to " + endpoint, e);
+            }
+            output = null;
+            socket = null;
+        }
+    }
+
+    private ByteBuffer take()
+    {
+        ByteBuffer bb;
+        try
+        {
+            bb = queue.take();
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+        return bb;
+    }
+
+    private boolean connect()
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("attempting to connect to " + endpoint);
+        long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() < start + DatabaseDescriptor.getRpcTimeout())
+        {
+            try
+            {
+                // zero means 'bind on any available port.'
+                socket = new Socket(endpoint, DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(),
0);
+                socket.setTcpNoDelay(true);
+                output = new DataOutputStream(socket.getOutputStream());
+                return true;
+            }
+            catch (IOException e)
+            {
+                socket = null;
+                if (logger.isTraceEnabled())
+                    logger.trace("unable to connect to " + endpoint, e);
+                try
+                {
+                    Thread.sleep(OPEN_RETRY_DELAY);
+                }
+                catch (InterruptedException e1)
+                {
+                    throw new AssertionError(e1);
+                }
+            }
+        }
+        return false;
+    }
+}



Mime
View raw message