cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amor...@apache.org
Subject git commit: Ensure unique streaming session id's
Date Fri, 18 May 2012 18:14:09 GMT
Updated Branches:
  refs/heads/cassandra-1.0 0077edc94 -> 2db61ab9e


Ensure unique streaming session id's

Patch by Aaron Morton, reviewed by Yuki Morishita for CASSANDRA-4223


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2db61ab9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2db61ab9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2db61ab9

Branch: refs/heads/cassandra-1.0
Commit: 2db61ab9e0a5fe7f764236ec060ee370c19bd593
Parents: 0077edc
Author: Aaron Morton <aaron@the-mortons.org>
Authored: Fri May 18 15:17:39 2012 +1200
Committer: Aaron Morton <aaron@the-mortons.org>
Committed: Fri May 18 15:17:39 2012 +1200

----------------------------------------------------------------------
 CHANGES.txt                                        |    3 +
 .../apache/cassandra/streaming/StreamHeader.java   |    5 ++
 .../cassandra/streaming/StreamInSession.java       |   31 ++++++++++++++-
 .../cassandra/streaming/StreamOutSession.java      |   29 +++++++++++++-
 4 files changed, 65 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2db61ab9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1c1c184..d1d00f9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,6 @@
+1.0.11
+ * ensure unique streaming session id's (CASSANDRA-4223)
+
 1.0.10
  * fix maxTimestamp to include row tombstones (CASSANDRA-4116)
  * avoid streaming empty files with bulk loader if sstablewriter errors out

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2db61ab9/src/java/org/apache/cassandra/streaming/StreamHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamHeader.java b/src/java/org/apache/cassandra/streaming/StreamHeader.java
index 95fcc0d..eb53a06 100644
--- a/src/java/org/apache/cassandra/streaming/StreamHeader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamHeader.java
@@ -33,6 +33,11 @@ public class StreamHeader
 {
     private static IVersionedSerializer<StreamHeader> serializer;
 
+    // Streaming sessionId flags, used to avoid duplicate session id's between nodes.
+    // See StreamInSession and StreamOutSession
+    public static final int STREAM_IN_SOURCE_FLAG = 0;
+    public static final int STREAM_OUT_SOURCE_FLAG = 1;
+
     static
     {
         serializer = new StreamHeaderSerializer();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2db61ab9/src/java/org/apache/cassandra/streaming/StreamInSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java
index aa92d4d..feb82dd 100644
--- a/src/java/org/apache/cassandra/streaming/StreamInSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Table;
@@ -50,15 +51,38 @@ public class StreamInSession
     private final List<SSTableReader> readers = new ArrayList<SSTableReader>();
     private PendingFile current;
 
+    private final static AtomicInteger sessionIdCounter = new AtomicInteger(0);
+
     private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)
     {
         this.context = context;
         this.callback = callback;
     }
 
+    /**
+     * The next session id is a combination of a local integer counter and a flag used to
avoid collisions
+     * between session id's generated on different machines. Nodes can may have StreamOutSessions
with the
+     * following contexts:
+     *
+     * <1.1.1.1, (stream_in_flag, 6)>
+     * <1.1.1.1, (stream_out_flag, 6)>
+     *
+     * The first is an out stream created in response to a request from node 1.1.1.1. The
 id (6) was created by
+     * the requesting node. The second is an out stream created by this node to push to 1.1.1.1.
The  id (6) was
+     * created by this node.
+     *
+     * Note: The StreamInSession results in a StreamOutSession on the target that uses the
StreamInSession sessionId.
+     *
+     * @return next StreamInSession sessionId
+     */
+    private static long nextSessionId()
+    {
+        return (((long)StreamHeader.STREAM_IN_SOURCE_FLAG << 32) + sessionIdCounter.incrementAndGet());
+    }
+
     public static StreamInSession create(InetAddress host, Runnable callback)
     {
-        Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, System.nanoTime());
+        Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, nextSessionId());
         StreamInSession session = new StreamInSession(context, callback);
         sessions.put(context, session);
         return session;
@@ -168,6 +192,11 @@ public class StreamInSession
         }
     }
 
+    public int getSourceFlag()
+    {
+        return (int)(context.right >> 32);
+    }
+
     public long getSessionId()
     {
         return context.right;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2db61ab9/src/java/org/apache/cassandra/streaming/StreamOutSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamOutSession.java b/src/java/org/apache/cassandra/streaming/StreamOutSession.java
index 3cbb294..4d09821 100644
--- a/src/java/org/apache/cassandra/streaming/StreamOutSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamOutSession.java
@@ -23,6 +23,7 @@ import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -43,10 +44,29 @@ public class StreamOutSession implements IEndpointStateChangeSubscriber,
IFailur
 
     // one host may have multiple stream sessions.
     private static final ConcurrentMap<Pair<InetAddress, Long>, StreamOutSession>
streams = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamOutSession>();
+    private final static AtomicInteger sessionIdCounter = new AtomicInteger(0);
+
+    /**
+     * The next session id is a combination of a local integer counter and a flag used to
avoid collisions
+     * between session id's generated on different machines. Nodes can may have StreamOutSessions
with the
+     * following contexts:
+     *
+     * <1.1.1.1, (stream_in_flag, 6)>
+     * <1.1.1.1, (stream_out_flag, 6)>
+     *
+     * The first is an out stream created in response to a request from node 1.1.1.1. The
 id (6) was created by
+     * the requesting node. The second is an out stream created by this node to push to 1.1.1.1.
The  id (6) was
+     * created by this node.
+     * @return next StreamOutSession sessionId
+     */
+    private static long nextSessionId()
+    {
+        return (((long)StreamHeader.STREAM_OUT_SOURCE_FLAG << 32) + sessionIdCounter.incrementAndGet());
+    }
 
     public static StreamOutSession create(String table, InetAddress host, Runnable callback)
     {
-        return create(table, host, System.nanoTime(), callback);
+        return create(table, host, nextSessionId(), callback);
     }
 
     public static StreamOutSession create(String table, InetAddress host, long sessionId)
@@ -84,6 +104,11 @@ public class StreamOutSession implements IEndpointStateChangeSubscriber,
IFailur
         FailureDetector.instance.registerFailureDetectionEventListener(this);
     }
 
+    public int getSourceFlag()
+    {
+        return (int)(context.right >> 32);
+    }
+
     public InetAddress getHost()
     {
         return context.left;
@@ -138,7 +163,7 @@ public class StreamOutSession implements IEndpointStateChangeSubscriber,
IFailur
         // time, if the endpoint die at the exact wrong time for instance.
         if (!isClosed.compareAndSet(false, true))
         {
-            logger.debug("StreamOutSession {} already closed", getSessionId());
+            logger.debug("StreamOutSession {} to {} already closed", getSessionId(), getHost());
             return;
         }
 


Mime
View raw message