cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [2/3] git commit: Fix tracing when operation completesbefore all responses arrive patch by jbellis; reviewed by slebresne for CASSANDRA-5668
Date Fri, 21 Jun 2013 14:27:52 GMT
Fix tracing when operation completesbefore all responses arrive
patch by jbellis; reviewed by slebresne for CASSANDRA-5668


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fbe8a6eb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fbe8a6eb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fbe8a6eb

Branch: refs/heads/trunk
Commit: fbe8a6eb213ee3558be701c73c31a0da79446657
Parents: 7dc2eb9
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Thu Jun 20 10:19:13 2013 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Fri Jun 21 09:23:39 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../DebuggableThreadPoolExecutor.java           | 22 ++++++++---
 .../cassandra/concurrent/StageManager.java      | 29 +++++++++------
 .../concurrent/TracingAwareExecutorService.java | 33 +++++++++++++++++
 .../apache/cassandra/net/MessagingService.java  | 14 ++++---
 .../cassandra/net/OutboundTcpConnection.java    | 13 ++++++-
 .../cassandra/service/MigrationManager.java     |  7 +---
 .../cassandra/tracing/ExpiredTraceState.java    | 39 ++++++++++++++++++++
 .../apache/cassandra/tracing/TraceState.java    | 12 ++++--
 .../org/apache/cassandra/tracing/Tracing.java   | 26 +++++++------
 .../service/AntiEntropyServiceTestAbstract.java | 10 ++---
 11 files changed, 156 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3847d6a..593bf7c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.6
+ * Fix tracing when operation completes before all responses arrive (CASSANDRA-5668)
  * Fix cross-DC mutation forwarding (CASSANDRA-5632)
  * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
  * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
index 25f15ee..26441ec 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
@@ -44,7 +44,7 @@ import static org.apache.cassandra.tracing.Tracing.isTracing;
  *   threads and the queue is full, we want the enqueuer to block.  But to allow the number
of threads to drop if a
  *   stage is less busy, core thread timeout is enabled.
  */
-public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
+public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService
 {
     protected static final Logger logger = LoggerFactory.getLogger(DebuggableThreadPoolExecutor.class);
     public static final RejectedExecutionHandler blockingExecutionHandler = new RejectedExecutionHandler()
@@ -131,12 +131,17 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
     protected void onFinalAccept(Runnable task) {}
     protected void onFinalRejection(Runnable task) {}
 
+    public void execute(Runnable command, TraceState state)
+    {
+        super.execute(state == null ? command : new TraceSessionWrapper<Object>(command,
state));
+    }
+
     // execute does not call newTaskFor
     @Override
     public void execute(Runnable command)
     {
         super.execute(isTracing() && !(command instanceof TraceSessionWrapper)
-                      ? new TraceSessionWrapper<Object>(command, null)
+                      ? new TraceSessionWrapper<Object>(command)
                       : command);
     }
 
@@ -145,7 +150,7 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
     {
         if (isTracing() && !(runnable instanceof TraceSessionWrapper))
         {
-            return new TraceSessionWrapper<T>(runnable, result);
+            return new TraceSessionWrapper<T>(Executors.callable(runnable, result));
         }
         return super.newTaskFor(runnable, result);
     }
@@ -256,10 +261,9 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
     {
         private final TraceState state;
 
-        public TraceSessionWrapper(Runnable runnable, T result)
+        public TraceSessionWrapper(Runnable command)
         {
-            super(runnable, result);
-            state = Tracing.instance().get();
+            this(command, null);
         }
 
         public TraceSessionWrapper(Callable<T> callable)
@@ -268,6 +272,12 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
             state = Tracing.instance().get();
         }
 
+        public TraceSessionWrapper(Runnable command, TraceState state)
+        {
+            super(command, null);
+            this.state = state;
+        }
+
         private void setupContext()
         {
             Tracing.instance().set(state);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 287b19e..2960f22 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.config.DatabaseDescriptor.*;
@@ -38,7 +39,7 @@ public class StageManager
 {
     private static final Logger logger = LoggerFactory.getLogger(StageManager.class);
 
-    private static final EnumMap<Stage, ThreadPoolExecutor> stages = new EnumMap<Stage,
ThreadPoolExecutor>(Stage.class);
+    private static final EnumMap<Stage, TracingAwareExecutorService> stages = new EnumMap<Stage,
TracingAwareExecutorService>(Stage.class);
 
     public static final long KEEPALIVE = 60; // seconds to keep "extra" threads alive for
when idle
 
@@ -60,7 +61,7 @@ public class StageManager
         stages.put(Stage.TRACING, tracingExecutor());
     }
 
-    private static ThreadPoolExecutor tracingExecutor()
+    private static ExecuteOnlyExecutor tracingExecutor()
     {
         RejectedExecutionHandler reh = new RejectedExecutionHandler()
         {
@@ -78,7 +79,7 @@ public class StageManager
                                        reh);
     }
 
-    private static ThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads)
+    private static JMXEnabledThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads)
     {
         return new JMXEnabledThreadPoolExecutor(numThreads,
                                                 KEEPALIVE,
@@ -88,7 +89,7 @@ public class StageManager
                                                 stage.getJmxType());
     }
 
-    private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads)
+    private static JMXConfigurableThreadPoolExecutor multiThreadedConfigurableStage(Stage
stage, int numThreads)
     {
         return new JMXConfigurableThreadPoolExecutor(numThreads,
                                                      KEEPALIVE,
@@ -98,7 +99,7 @@ public class StageManager
                                                      stage.getJmxType());
     }
 
-    private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads,
int maxTasksBeforeBlock)
+    private static JMXConfigurableThreadPoolExecutor multiThreadedConfigurableStage(Stage
stage, int numThreads, int maxTasksBeforeBlock)
     {
         return new JMXConfigurableThreadPoolExecutor(numThreads,
                                                      KEEPALIVE,
@@ -111,8 +112,8 @@ public class StageManager
     /**
      * Retrieve a stage from the StageManager
      * @param stage name of the stage to be retrieved.
-    */
-    public static ThreadPoolExecutor getStage(Stage stage)
+     */
+    public static TracingAwareExecutorService getStage(Stage stage)
     {
         return stages.get(stage);
     }
@@ -132,29 +133,35 @@ public class StageManager
      * A TPE that disallows submit so that we don't need to worry about unwrapping exceptions
on the
      * tracing stage.  See CASSANDRA-1123 for background.
      */
-    private static class ExecuteOnlyExecutor extends ThreadPoolExecutor
+    private static class ExecuteOnlyExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService
     {
         public ExecuteOnlyExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler
handler)
         {
             super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,
handler);
         }
 
+        public void execute(Runnable command, TraceState state)
+        {
+            assert state == null;
+            super.execute(command);
+        }
+
         @Override
         public Future<?> submit(Runnable task)
         {
-            return super.submit(task);
+            throw new UnsupportedOperationException();
         }
 
         @Override
         public <T> Future<T> submit(Runnable task, T result)
         {
-            return super.submit(task, result);
+            throw new UnsupportedOperationException();
         }
 
         @Override
         public <T> Future<T> submit(Callable<T> task)
         {
-            return super.submit(task);
+            throw new UnsupportedOperationException();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
new file mode 100644
index 0000000..e5dcd7e
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
@@ -0,0 +1,33 @@
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.cassandra.tracing.TraceState;
+
+public interface TracingAwareExecutorService extends ExecutorService
+{
+    // we need a way to inject a TraceState directly into the Executor context without going
through
+    // the global Tracing sessions; see CASSANDRA-5668
+    public void execute(Runnable command, TraceState state);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 5e36f44..2964d35 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -29,7 +29,6 @@ import java.nio.channels.ClosedChannelException;
 import java.nio.channels.ServerSocketChannel;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.management.MBeanServer;
@@ -37,12 +36,14 @@ import javax.management.ObjectName;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.TracingAwareExecutorService;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 import org.apache.cassandra.db.*;
@@ -61,9 +62,9 @@ import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.streaming.compress.CompressedFileStreamTask;
+import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 public final class MessagingService implements MessagingServiceMBean
 {
@@ -702,15 +703,16 @@ public final class MessagingService implements MessagingServiceMBean
 
     public void receive(MessageIn message, String id, long timestamp)
     {
-        Tracing.instance().initializeFromMessage(message);
-        Tracing.trace("Message received from {}", message.from);
+        TraceState state = Tracing.instance().initializeFromMessage(message);
+        if (state != null)
+            state.trace("Message received from {}", message.from);
 
         message = SinkManager.processInboundMessage(message, id);
         if (message == null)
             return;
 
         Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
-        ExecutorService stage = StageManager.getStage(message.getMessageType());
+        TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType());
         assert stage != null : "No stage for message type " + message.verb;
 
         if (message.verb == Verb.REQUEST_RESPONSE && PBSPredictor.instance().isLoggingEnabled())
@@ -727,7 +729,7 @@ public final class MessagingService implements MessagingServiceMBean
             }
         }
 
-        stage.execute(runnable);
+        stage.execute(runnable, state);
     }
 
     public void setCallbackForTests(String messageId, CallbackInfo callback)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 32ce224..ee30d36 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -179,8 +179,17 @@ public class OutboundTcpConnection extends Thread
             {
                 UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
                 TraceState state = Tracing.instance().get(sessionId);
-                state.trace("Sending message to {}", poolReference.endPoint());
-                Tracing.instance().stopIfNonLocal(state);
+                String message = String.format("Sending message to %s", poolReference.endPoint());
+                // session may have already finished; see CASSANDRA-5668
+                if (state == null)
+                {
+                    TraceState.trace(ByteBuffer.wrap(sessionBytes), message, -1);
+                }
+                else
+                {
+                    state.trace(message);
+                    Tracing.instance().stopIfNonLocal(state);
+                }
             }
 
             write(qm.message, qm.id, qm.timestamp, out, targetVersion);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 127b2b8..de34785 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -23,10 +23,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
@@ -160,7 +157,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
 
     public static boolean isReadyForBootstrap()
     {
-        return StageManager.getStage(Stage.MIGRATION).getActiveCount() == 0;
+        return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount()
== 0;
     }
 
     public void notifyCreateKeyspace(KSMetaData ksm)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
new file mode 100644
index 0000000..6b4f90b
--- /dev/null
+++ b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
@@ -0,0 +1,39 @@
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+package org.apache.cassandra.tracing;
+
+import java.util.UUID;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+public class ExpiredTraceState extends TraceState
+{
+    public ExpiredTraceState(UUID sessionId)
+    {
+        super(FBUtilities.getBroadcastAddress(), sessionId, true);
+    }
+
+    public int elapsed()
+    {
+        return -1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
index 703c4ee..25599c4 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -81,11 +81,14 @@ public class TraceState
         trace(MessageFormatter.arrayFormat(format, args).getMessage());
     }
 
-    public void trace(final String message)
+    public void trace(String message)
     {
-        final int elapsed = elapsed();
-        final ByteBuffer eventId = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
+        TraceState.trace(sessionIdBytes, message, elapsed());
+    }
 
+    public static void trace(final ByteBuffer sessionIdBytes, final String message, final
int elapsed)
+    {
+        final ByteBuffer eventId = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
         final String threadName = Thread.currentThread().getName();
 
         StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
@@ -96,7 +99,8 @@ public class TraceState
                 ColumnFamily cf = ColumnFamily.create(cfMeta);
                 Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source")),
FBUtilities.getBroadcastAddress());
                 Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("thread")),
threadName);
-                Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")),
elapsed);
+                if (elapsed >= 0)
+                    Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")),
elapsed);
                 Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("activity")),
message);
                 RowMutation mutation = new RowMutation(Tracing.TRACE_KS, sessionIdBytes);
                 mutation.add(cf);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java
index 17241b9..eb5bad9 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.ExpiringColumn;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -229,31 +230,34 @@ public class Tracing
     }
 
     /**
-     * Updates the threads query context from a message
+     * Determines the tracing context from a message.  Does NOT set the threadlocal state.
      * 
-     * @param message
-     *            The internode message
+     * @param message The internode message
      */
-    public void initializeFromMessage(final MessageIn<?> message)
+    public TraceState initializeFromMessage(final MessageIn<?> message)
     {
         final byte[] sessionBytes = message.parameters.get(Tracing.TRACE_HEADER);
 
-        // if the message has no session context header don't do tracing
         if (sessionBytes == null)
-        {
-            state.set(null);
-            return;
-        }
+            return null;
 
         assert sessionBytes.length == 16;
         UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
         TraceState ts = sessions.get(sessionId);
-        if (ts == null)
+        if (ts != null)
+            return ts;
+
+        if (message.verb == MessagingService.Verb.REQUEST_RESPONSE)
+        {
+            // received a message for a session we've already closed out.  see CASSANDRA-5668
+            return new ExpiredTraceState(sessionId);
+        }
+        else
         {
             ts = new TraceState(message.from, sessionId, false);
             sessions.put(sessionId, ts);
+            return ts;
         }
-        state.set(ts);
     }
 
     public static void trace(String message)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index fc0b832..b80c272 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -20,10 +20,11 @@ package org.apache.cassandra.service;
 
 import java.net.InetAddress;
 import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Sets;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -42,13 +43,12 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
-import static org.apache.cassandra.service.AntiEntropyService.*;
-
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
 
+import static org.apache.cassandra.service.AntiEntropyService.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -305,7 +305,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
 
     void flushAES() throws Exception
     {
-        final ThreadPoolExecutor stage = StageManager.getStage(Stage.ANTI_ENTROPY);
+        final ExecutorService stage = StageManager.getStage(Stage.ANTI_ENTROPY);
         final Callable noop = new Callable<Object>()
         {
             public Boolean call()


Mime
View raw message