cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jasobr...@apache.org
Subject [2/2] git commit: Merge branch 'cassandra-1.2' into trunk
Date Thu, 27 Jun 2013 20:39:03 GMT
Merge branch 'cassandra-1.2' into trunk

Conflicts:
	src/java/org/apache/cassandra/net/OutboundTcpConnection.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4cda3622
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4cda3622
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4cda3622

Branch: refs/heads/trunk
Commit: 4cda3622d0a86d45d73a31576fc8b39b9e66928d
Parents: b1f3fc0 296da81
Author: Jason Brown <jasedbrown@gmail.com>
Authored: Thu Jun 27 13:37:27 2013 -0700
Committer: Jason Brown <jasedbrown@gmail.com>
Committed: Thu Jun 27 13:37:27 2013 -0700

----------------------------------------------------------------------
 .../cassandra/net/OutboundTcpConnection.java    | 59 +++++++++++++++++++-
 1 file changed, 56 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cda3622/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 4c6f498,648123b..1bdead2
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@@ -285,12 -293,10 +289,11 @@@ public class OutboundTcpConnection exte
          if (logger.isDebugEnabled())
              logger.debug("attempting to connect to " + poolReference.endPoint());
  
-         targetVersion = MessagingService.instance().getVersion(poolReference.endPoint());
- 
 -        long start = System.currentTimeMillis();
 -        while (System.currentTimeMillis() < start + DatabaseDescriptor.getRpcTimeout())
 +        long start = System.nanoTime();
 +        long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getRpcTimeout());
 +        while (System.nanoTime() - start < timeout)
          {
+             targetVersion = MessagingService.instance().getVersion(poolReference.endPoint());
              try
              {
                  socket = poolReference.newSocket();
@@@ -316,35 -322,47 +319,44 @@@
                  }
                  out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(),
4096));
  
 -                if (targetVersion >= MessagingService.VERSION_12)
 -                {
 -                    out.writeInt(MessagingService.PROTOCOL_MAGIC);
 -                    writeHeader(out, targetVersion, shouldCompressConnection());
 -                    out.flush();
 +                out.writeInt(MessagingService.PROTOCOL_MAGIC);
 +                writeHeader(out, targetVersion, shouldCompressConnection());
 +                out.flush();
  
 -                    DataInputStream in = new DataInputStream(socket.getInputStream());
 -                    int maxTargetVersion = handshakeVersion(in);
 -                    if (maxTargetVersion == NO_VERSION) 
 -                    {
 -                        // no version is returned, so disconnect an try again: we will either
get
 -                        // a different target version (targetVersion < MessagingService.VERSION_12)
 -                        // or if the same version the handshake will finally succeed
 -                        logger.debug("Target max version is {}; no version information yet,
will retry", maxTargetVersion);
 -                        disconnect();
 -                        continue;
 -                    }
 -                    if (targetVersion > maxTargetVersion)
 -                    {
 -                        logger.debug("Target max version is {}; will reconnect with that
version", maxTargetVersion);
 -                        MessagingService.instance().setVersion(poolReference.endPoint(),
maxTargetVersion);
 -                        disconnect();
 -                        return false;
 -                    }
 +                DataInputStream in = new DataInputStream(socket.getInputStream());
-                 int maxTargetVersion = in.readInt();
++                int maxTargetVersion = handshakeVersion(in);
++                if (maxTargetVersion == NO_VERSION)
++                {
++                    // no version is returned, so disconnect an try again: we will either
get
++                    // a different target version (targetVersion < MessagingService.VERSION_12)
++                    // or if the same version the handshake will finally succeed
++                    logger.debug("Target max version is {}; no version information yet,
will retry", maxTargetVersion);
++                    disconnect();
++                    continue;
++                }
 +                if (targetVersion > maxTargetVersion)
 +                {
 +                    logger.debug("Target max version is {}; will reconnect with that version",
maxTargetVersion);
 +                    MessagingService.instance().setVersion(poolReference.endPoint(), maxTargetVersion);
 +                    disconnect();
 +                    return false;
 +                }
  
 -                    if (targetVersion < maxTargetVersion && targetVersion <
MessagingService.current_version)
 -                    {
 -                        logger.trace("Detected higher max version {} (using {}); will reconnect
when queued messages are done",
 -                                     maxTargetVersion, targetVersion);
 -                        MessagingService.instance().setVersion(poolReference.endPoint(),
Math.min(MessagingService.current_version, maxTargetVersion));
 -                        softCloseSocket();
 -                    }
 +                if (targetVersion < maxTargetVersion && targetVersion < MessagingService.current_version)
 +                {
 +                    logger.trace("Detected higher max version {} (using {}); will reconnect
when queued messages are done",
 +                                 maxTargetVersion, targetVersion);
 +                    MessagingService.instance().setVersion(poolReference.endPoint(), Math.min(MessagingService.current_version,
maxTargetVersion));
 +                    softCloseSocket();
 +                }
  
 -                    out.writeInt(MessagingService.current_version);
 -                    CompactEndpointSerializationHelper.serialize(FBUtilities.getBroadcastAddress(),
out);
 -                    if (shouldCompressConnection())
 -                    {
 -                        out.flush();
 -                        logger.trace("Upgrading OutputStream to be compressed");
 -                        out = new DataOutputStream(new SnappyOutputStream(new BufferedOutputStream(socket.getOutputStream())));
 -                    }
 +                out.writeInt(MessagingService.current_version);
 +                CompactEndpointSerializationHelper.serialize(FBUtilities.getBroadcastAddress(),
out);
 +                if (shouldCompressConnection())
 +                {
 +                    out.flush();
 +                    logger.trace("Upgrading OutputStream to be compressed");
 +                    out = new DataOutputStream(new SnappyOutputStream(new BufferedOutputStream(socket.getOutputStream())));
                  }
  
                  return true;


Mime
View raw message