cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amor...@apache.org
Subject git commit: Ensure unique streaming session id's
Date Fri, 18 May 2012 18:28:26 GMT
Updated Branches:
  refs/heads/cassandra-1.1 2c01cba39 -> bbe105cfa


Ensure unique streaming session id's

Patch by Aaron Morton, reviewed by Yuki Morishita for CASSANDRA-4223


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bbe105cf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bbe105cf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bbe105cf

Branch: refs/heads/cassandra-1.1
Commit: bbe105cfa556831041efc27a56b4584bc5b5e143
Parents: 2c01cba
Author: Aaron Morton <aaron@the-mortons.org>
Authored: Fri May 18 15:17:39 2012 +1200
Committer: Aaron Morton <aaron@the-mortons.org>
Committed: Sat May 19 06:27:39 2012 +1200

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +-
 .../cassandra/streaming/AbstractStreamSession.java |    5 +++
 .../apache/cassandra/streaming/StreamHeader.java   |    5 +++
 .../cassandra/streaming/StreamInSession.java       |   25 ++++++++++++++-
 .../cassandra/streaming/StreamOutSession.java      |   22 ++++++++++++-
 5 files changed, 56 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bbe105cf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a375f36..fe448f2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -61,7 +61,7 @@ Merged from 1.0:
  * stress tool to return appropriate exit code on failure (CASSANDRA-4188)
  * fix compaction NPE when out of disk space and assertions disabled
    (CASSANDRA-3985)
-
+ * ensure unique streaming session id's (CASSANDRA-4223)
 
 1.1.0-final
  * average a reduced liveRatio estimate with the previous one (CASSANDRA-4065)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bbe105cf/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java b/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
index 1938e3d..d190506 100644
--- a/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java
@@ -46,6 +46,11 @@ public abstract class AbstractStreamSession implements IEndpointStateChangeSubsc
         FailureDetector.instance.registerFailureDetectionEventListener(this);
     }
 
+    public int getSourceFlag()
+    {
+        return (int)(context.right >> 32);
+    }
+
     public long getSessionId()
     {
         return context.right;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bbe105cf/src/java/org/apache/cassandra/streaming/StreamHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamHeader.java b/src/java/org/apache/cassandra/streaming/StreamHeader.java
index 793e19e..5ffe03e 100644
--- a/src/java/org/apache/cassandra/streaming/StreamHeader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamHeader.java
@@ -37,6 +37,11 @@ public class StreamHeader
 {
     private static IVersionedSerializer<StreamHeader> serializer;
 
+    // Streaming sessionId flags, used to avoid duplicate session id's between nodes.
+    // See StreamInSession and StreamOutSession
+    public static final int STREAM_IN_SOURCE_FLAG = 0;
+    public static final int STREAM_OUT_SOURCE_FLAG = 1;
+
     static
     {
         serializer = new StreamHeaderSerializer();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bbe105cf/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index 1660aaa..e123714 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -24,6 +24,7 @@ import java.net.InetAddress;
 import java.net.Socket;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.MessagingService;
@@ -52,6 +53,28 @@ public class StreamInSession extends AbstractStreamSession
     private PendingFile current;
     private Socket socket;
     private volatile int retries;
+    private final static AtomicInteger sessionIdCounter = new AtomicInteger(0);
+
+    /**
+     * The next session id is a combination of a local integer counter and a flag used to
avoid collisions
+     * between session id's generated on different machines. Nodes can may have StreamOutSessions
with the
+     * following contexts:
+     *
+     * <1.1.1.1, (stream_in_flag, 6)>
+     * <1.1.1.1, (stream_out_flag, 6)>
+     *
+     * The first is an out stream created in response to a request from node 1.1.1.1. The
 id (6) was created by
+     * the requesting node. The second is an out stream created by this node to push to 1.1.1.1.
The  id (6) was
+     * created by this node.
+     *
+     * Note: The StreamInSession results in a StreamOutSession on the target that uses the
StreamInSession sessionId.
+     *
+     * @return next StreamInSession sessionId
+     */
+    private static long nextSessionId()
+    {
+        return (((long)StreamHeader.STREAM_IN_SOURCE_FLAG << 32) + sessionIdCounter.incrementAndGet());
+    }
 
     private StreamInSession(Pair<InetAddress, Long> context, IStreamCallback callback)
     {
@@ -60,7 +83,7 @@ public class StreamInSession extends AbstractStreamSession
 
     public static StreamInSession create(InetAddress host, IStreamCallback callback)
     {
-        Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, System.nanoTime());
+        Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, nextSessionId());
         StreamInSession session = new StreamInSession(context, callback);
         sessions.put(context, session);
         return session;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bbe105cf/src/java/org/apache/cassandra/streaming/StreamOutSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOutSession.java b/src/java/org/apache/cassandra/streaming/StreamOutSession.java
index 7a53c7c..60de2cb 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOutSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOutSession.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang.StringUtils;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -41,10 +42,29 @@ public class StreamOutSession extends AbstractStreamSession
 
     // one host may have multiple stream sessions.
     private static final ConcurrentMap<Pair<InetAddress, Long>, StreamOutSession>
streams = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamOutSession>();
+    private final static AtomicInteger sessionIdCounter = new AtomicInteger(0);
+
+    /**
+     * The next session id is a combination of a local integer counter and a flag used to
avoid collisions
+     * between session id's generated on different machines. Nodes can may have StreamOutSessions
with the
+     * following contexts:
+     *
+     * <1.1.1.1, (stream_in_flag, 6)>
+     * <1.1.1.1, (stream_out_flag, 6)>
+     *
+     * The first is an out stream created in response to a request from node 1.1.1.1. The
 id (6) was created by
+     * the requesting node. The second is an out stream created by this node to push to 1.1.1.1.
The  id (6) was
+     * created by this node.
+     * @return next StreamOutSession sessionId
+     */
+    private static long nextSessionId()
+    {
+        return (((long)StreamHeader.STREAM_OUT_SOURCE_FLAG << 32) + sessionIdCounter.incrementAndGet());
+    }
 
     public static StreamOutSession create(String table, InetAddress host, IStreamCallback
callback)
     {
-        return create(table, host, System.nanoTime(), callback);
+        return create(table, host, nextSessionId(), callback);
     }
 
     public static StreamOutSession create(String table, InetAddress host, long sessionId)


Mime
View raw message