cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sn...@apache.org
Subject [1/3] cassandra git commit: Revert CASSANDRA-7807 (tracing completion client notifications) (CASSANDRA-9429)
Date Wed, 20 May 2015 08:43:55 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 6eea3ea39 -> e4eba2538
  refs/heads/trunk 6ed3c1958 -> eaf7d81e7


Revert CASSANDRA-7807 (tracing completion client notifications) (CASSANDRA-9429)

patch by Robert Stupp; reviewed by Aleksey Yeschenko for CASSANDRA-9429


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e4eba253
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e4eba253
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e4eba253

Branch: refs/heads/cassandra-2.2
Commit: e4eba2538255fdc23cad59642be69c8b27f04218
Parents: 6eea3ea
Author: Robert Stupp <snazy@snazy.de>
Authored: Wed May 20 08:48:53 2015 +0200
Committer: Robert Stupp <snazy@snazy.de>
Committed: Wed May 20 08:48:53 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 doc/native_protocol_v4.spec                     |   6 -
 .../apache/cassandra/service/QueryState.java    |  10 +-
 .../apache/cassandra/tracing/TraceState.java    |  42 +---
 .../org/apache/cassandra/tracing/Tracing.java   |  24 +--
 .../apache/cassandra/transport/Connection.java  |   9 -
 .../org/apache/cassandra/transport/Event.java   |  54 -----
 .../org/apache/cassandra/transport/Server.java  |   5 -
 .../transport/messages/BatchMessage.java        |   2 +-
 .../transport/messages/ExecuteMessage.java      |   2 +-
 .../transport/messages/PrepareMessage.java      |   2 +-
 .../transport/messages/QueryMessage.java        |   2 +-
 .../cassandra/tracing/TraceCompleteTest.java    | 204 -------------------
 13 files changed, 19 insertions(+), 344 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4e79ea0..a227f5e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 2.2
  * Ensure that UDF and UDAs are keyspace-isolated (CASSANDRA-9409)
+ * Revert CASSANDRA-7807 (tracing completion client notifications) (CASSANDRA-9429)
 Merged from 2.1:
  * Use configured gcgs in anticompaction (CASSANDRA-9397)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/doc/native_protocol_v4.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v4.spec b/doc/native_protocol_v4.spec
index ba3d3b3..f040323 100644
--- a/doc/native_protocol_v4.spec
+++ b/doc/native_protocol_v4.spec
@@ -130,8 +130,6 @@ Table of Contents
           ignored by the default QueryHandler implementation.
           Currently, only QUERY, PREPARE, EXECUTE and BATCH requests support
           payload.
-          If both trace-flag and payload-flag are set, the generic key-value
-          payload appears after trace's data.
           Type of custom payload is [bytes map] (see below).
     0x08: Warning flag. The response contains warnings from the server which
           were generated by the server to go along with this response.
@@ -755,9 +753,6 @@ Table of Contents
             - [string] keyspace containing the user defined function / aggregate
             - [string] the function/aggregate name
             - [string list] one string for each argument type (as CQL type)
-    - "TRACE_COMPLETE": notification that a trace session has completed at least
-      on the coordinator. After the event type, the rest of the message will
-      contain the trace session-ID [uuid] as the only argument.
 
   All EVENT messages have a streamId of -1 (Section 2.3).
 
@@ -1162,7 +1157,6 @@ Table of Contents
   * Read_failure error code was added.
   * Function_failure error code was added.
   * Add custom payload to frames for custom QueryHandler implementations (ignored by Cassandra's
standard QueryHandler)
-  * Add "TRACE_COMPLETE" event (section 4.2.6).
   * Add warnings to frames for responses for which the server generated a warning during
processing, which the client needs to address.
   * Add the date and time data types
   * Add the tinyint and smallint data types

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/src/java/org/apache/cassandra/service/QueryState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/QueryState.java b/src/java/org/apache/cassandra/service/QueryState.java
index 5e89ac8..ddbc959 100644
--- a/src/java/org/apache/cassandra/service/QueryState.java
+++ b/src/java/org/apache/cassandra/service/QueryState.java
@@ -22,7 +22,6 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.Connection;
 
 /**
  * Represents the state related to a given query.
@@ -77,19 +76,14 @@ public class QueryState
 
     public void createTracingSession()
     {
-        createTracingSession(null);
-    }
-
-    public void createTracingSession(Connection connection)
-    {
         UUID session = this.preparedTracingSession;
         if (session == null)
         {
-            Tracing.instance.newSession(connection);
+            Tracing.instance.newSession();
         }
         else
         {
-            Tracing.instance.newSession(connection, session);
+            Tracing.instance.newSession(session);
             this.preparedTracingSession = null;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
index c029ac7..e882e67 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -35,8 +35,6 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.exceptions.OverloadedException;
 import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.transport.Connection;
-import org.apache.cassandra.transport.Event;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.cassandra.utils.progress.ProgressEvent;
@@ -60,10 +58,6 @@ public class TraceState implements ProgressEventNotifier
     private final List<ProgressListener> listeners = new CopyOnWriteArrayList<>();
     private String tag;
 
-    private final boolean withFinishEvent;
-    private final AtomicInteger pendingMutations = new AtomicInteger();
-    private final Connection connection;
-
     public enum Status
     {
         IDLE,
@@ -79,24 +73,17 @@ public class TraceState implements ProgressEventNotifier
 
     public TraceState(InetAddress coordinator, UUID sessionId, Tracing.TraceType traceType)
     {
-        this(coordinator, null, sessionId, traceType, false);
-    }
-
-    public TraceState(InetAddress coordinator, Connection connection, UUID sessionId, Tracing.TraceType
traceType, boolean withFinishEvent)
-    {
         assert coordinator != null;
         assert sessionId != null;
 
         this.coordinator = coordinator;
-        this.connection = connection;
         this.sessionId = sessionId;
         sessionIdBytes = ByteBufferUtil.bytes(sessionId);
         this.traceType = traceType;
         this.ttl = traceType.getTTL();
         watch = Stopwatch.createStarted();
         this.status = Status.IDLE;
-        this.withFinishEvent = withFinishEvent;
-    }
+}
 
     /**
      * Activate notification with provided {@code tag} name.
@@ -134,19 +121,6 @@ public class TraceState implements ProgressEventNotifier
     {
         status = Status.STOPPED;
         notifyAll();
-        pushEventIfStopped();
-    }
-
-    private void pushEventIfStopped()
-    {
-        if (status == Status.STOPPED && pendingMutations.get() == 0)
-        {
-            // poor-man's prevention of duplicate tracing-finished events
-            pendingMutations.set(Integer.MIN_VALUE);
-
-            if (connection != null && withFinishEvent)
-                connection.sendIfRegistered(new Event.TraceComplete(sessionId));
-        }
     }
 
     /*
@@ -214,23 +188,13 @@ public class TraceState implements ProgressEventNotifier
         }
     }
 
-    void executeMutation(final Mutation mutation)
+    static void executeMutation(final Mutation mutation)
     {
-        pendingMutations.incrementAndGet();
-
         StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
         {
             protected void runMayThrow() throws Exception
             {
-                try
-                {
-                    mutateWithCatch(mutation);
-                }
-                finally
-                {
-                    if (pendingMutations.decrementAndGet() == 0)
-                        pushEventIfStopped();
-                }
+            mutateWithCatch(mutation);
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java
index 0e49cd0..3f9f54d 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.marshal.TimeUUIDType;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.transport.Connection;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
@@ -113,31 +112,26 @@ public class Tracing
         return instance.state.get() != null;
     }
 
-    public UUID newSession(Connection connection)
+    public UUID newSession()
     {
-        return newSession(connection, TraceType.QUERY);
+        return newSession(TraceType.QUERY);
     }
 
     public UUID newSession(TraceType traceType)
     {
-        return newSession(null, traceType);
+        return newSession(TimeUUIDType.instance.compose(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())),
traceType);
     }
 
-    public UUID newSession(Connection connection, TraceType traceType)
+    public UUID newSession(UUID sessionId)
     {
-        return newSession(connection, TimeUUIDType.instance.compose(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())),
traceType, false);
+        return newSession(sessionId, TraceType.QUERY);
     }
 
-    public UUID newSession(Connection connection, UUID sessionId)
-    {
-        return newSession(connection, sessionId, TraceType.QUERY, true);
-    }
-
-    private UUID newSession(Connection connection, UUID sessionId, TraceType traceType, boolean
withFinishEvent)
+    private UUID newSession(UUID sessionId, TraceType traceType)
     {
         assert state.get() == null;
 
-        TraceState ts = new TraceState(localAddress, connection, sessionId, traceType, withFinishEvent);
+        TraceState ts = new TraceState(localAddress, sessionId, traceType);
         state.set(ts);
         sessions.put(sessionId, ts);
 
@@ -166,7 +160,7 @@ public class Tracing
             final ByteBuffer sessionId = state.sessionIdBytes;
             final int ttl = state.ttl;
 
-            state.executeMutation(TraceKeyspace.makeStopSessionMutation(sessionId, elapsed,
ttl));
+            TraceState.executeMutation(TraceKeyspace.makeStopSessionMutation(sessionId, elapsed,
ttl));
 
             state.stop();
             sessions.remove(state.sessionId);
@@ -204,7 +198,7 @@ public class Tracing
         final String command = state.traceType.toString();
         final int ttl = state.ttl;
 
-        state.executeMutation(TraceKeyspace.makeStartSessionMutation(sessionId, client, parameters,
request, startedAt, command, ttl));
+        TraceState.executeMutation(TraceKeyspace.makeStartSessionMutation(sessionId, client,
parameters, request, startedAt, command, ttl));
 
         return state;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/src/java/org/apache/cassandra/transport/Connection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Connection.java b/src/java/org/apache/cassandra/transport/Connection.java
index e2811e9..af26557 100644
--- a/src/java/org/apache/cassandra/transport/Connection.java
+++ b/src/java/org/apache/cassandra/transport/Connection.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.transport;
 
 import io.netty.channel.Channel;
 import io.netty.util.AttributeKey;
-import org.apache.cassandra.transport.messages.EventMessage;
 
 public class Connection
 {
@@ -65,12 +64,6 @@ public class Connection
         return channel;
     }
 
-    public void sendIfRegistered(Event event)
-    {
-        if (getTracker().isRegistered(event.type, channel))
-            channel.writeAndFlush(new EventMessage(event));
-    }
-
     public interface Factory
     {
         Connection newConnection(Channel channel, int version);
@@ -79,7 +72,5 @@ public class Connection
     public interface Tracker
     {
         void addConnection(Channel ch, Connection connection);
-
-        boolean isRegistered(Event.Type type, Channel ch);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/src/java/org/apache/cassandra/transport/Event.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java
index 16b5f64..9b6fdd4 100644
--- a/src/java/org/apache/cassandra/transport/Event.java
+++ b/src/java/org/apache/cassandra/transport/Event.java
@@ -62,8 +62,6 @@ public abstract class Event
                 return StatusChange.deserializeEvent(cb, version);
             case SCHEMA_CHANGE:
                 return SchemaChange.deserializeEvent(cb, version);
-            case TRACE_COMPLETE:
-                return TraceComplete.deserializeEvent(cb, version);
         }
         throw new AssertionError();
     }
@@ -417,56 +415,4 @@ public abstract class Event
                 && Objects.equal(argTypes, scc.argTypes);
         }
     }
-
-    /**
-     * @since native protocol v4
-     */
-    public static class TraceComplete extends Event
-    {
-        public final UUID traceSessionId;
-
-        public TraceComplete(UUID traceSessionId)
-        {
-            super(Type.TRACE_COMPLETE);
-            this.traceSessionId = traceSessionId;
-        }
-
-        public static Event deserializeEvent(ByteBuf cb, int version)
-        {
-            UUID traceSessionId = CBUtil.readUUID(cb);
-            return new TraceComplete(traceSessionId);
-        }
-
-        protected void serializeEvent(ByteBuf dest, int version)
-        {
-            CBUtil.writeUUID(traceSessionId, dest);
-        }
-
-        protected int eventSerializedSize(int version)
-        {
-            return CBUtil.sizeOfUUID(traceSessionId);
-        }
-
-        @Override
-        public String toString()
-        {
-            return traceSessionId.toString();
-        }
-
-        @Override
-        public int hashCode()
-        {
-            return Objects.hashCode(traceSessionId);
-        }
-
-        @Override
-        public boolean equals(Object other)
-        {
-            if (!(other instanceof TraceComplete))
-                return false;
-
-            TraceComplete tf = (TraceComplete)other;
-            return Objects.equal(traceSessionId, tf.traceSessionId);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java
index 40a3371..333b956 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -233,11 +233,6 @@ public class Server implements CassandraDaemon.Server
             groups.get(type).add(ch);
         }
 
-        public boolean isRegistered(Event.Type type, Channel ch)
-        {
-            return groups.get(type).contains(ch);
-        }
-
         public void send(Event event)
         {
             groups.get(event.type).writeAndFlush(new EventMessage(event));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index 8f144d1..4755ad3 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -165,7 +165,7 @@ public class BatchMessage extends Message.Request
 
             if (state.traceNextQuery())
             {
-                state.createTracingSession(connection);
+                state.createTracingSession();
                 // TODO we don't have [typed] access to CQL bind variables here.  CASSANDRA-4560
is open to add support.
                 Tracing.instance.begin("Execute batch of CQL3 queries", state.getClientAddress(),
Collections.<String, String>emptyMap());
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 2b21376..3eddc7d 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -122,7 +122,7 @@ public class ExecuteMessage extends Message.Request
 
             if (state.traceNextQuery())
             {
-                state.createTracingSession(connection);
+                state.createTracingSession();
 
                 ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
                 if (options.getPageSize() > 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index db9e304..f54d1d9 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -71,7 +71,7 @@ public class PrepareMessage extends Message.Request
 
             if (state.traceNextQuery())
             {
-                state.createTracingSession(connection);
+                state.createTracingSession();
                 Tracing.instance.begin("Preparing CQL3 query", state.getClientAddress(),
ImmutableMap.of("query", query));
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index fe86a89..4e21678 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -106,7 +106,7 @@ public class QueryMessage extends Message.Request
 
             if (state.traceNextQuery())
             {
-                state.createTracingSession(connection);
+                state.createTracingSession();
 
                 ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
                 builder.put("query", query);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4eba253/test/unit/org/apache/cassandra/tracing/TraceCompleteTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/tracing/TraceCompleteTest.java b/test/unit/org/apache/cassandra/tracing/TraceCompleteTest.java
deleted file mode 100644
index 8ef7e52..0000000
--- a/test/unit/org/apache/cassandra/tracing/TraceCompleteTest.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.tracing;
-
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.transport.Event;
-import org.apache.cassandra.transport.Message;
-import org.apache.cassandra.transport.ProtocolException;
-import org.apache.cassandra.transport.Server;
-import org.apache.cassandra.transport.SimpleClient;
-import org.apache.cassandra.transport.messages.QueryMessage;
-import org.apache.cassandra.transport.messages.RegisterMessage;
-
-public class TraceCompleteTest extends CQLTester
-{
-    @Test
-    public void testTraceComplete() throws Throwable
-    {
-        sessionNet(3);
-
-        SimpleClient clientA = new SimpleClient(nativeAddr.getHostAddress(), nativePort);
-        clientA.connect(false);
-        try
-        {
-            SimpleClient.SimpleEventHandler eventHandlerA = new SimpleClient.SimpleEventHandler();
-            clientA.setEventHandler(eventHandlerA);
-
-            SimpleClient clientB = new SimpleClient(nativeAddr.getHostAddress(), nativePort);
-            clientB.connect(false);
-            try
-            {
-                SimpleClient.SimpleEventHandler eventHandlerB = new SimpleClient.SimpleEventHandler();
-                clientB.setEventHandler(eventHandlerB);
-
-                Message.Response resp = clientA.execute(new RegisterMessage(Collections.singletonList(Event.Type.TRACE_COMPLETE)));
-                Assert.assertSame(Message.Type.READY, resp.type);
-
-                createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)");
-
-                QueryMessage query = new QueryMessage("SELECT * FROM " + KEYSPACE + '.' +
currentTable(), QueryOptions.DEFAULT);
-                query.setTracingRequested();
-                resp = clientA.execute(query);
-
-                Event event = eventHandlerA.queue.poll(250, TimeUnit.MILLISECONDS);
-                Assert.assertNotNull(event);
-
-                // assert that only the connection that started the trace receives the trace-complete
event
-                Assert.assertNull(eventHandlerB.queue.poll(100, TimeUnit.MILLISECONDS));
-
-                Assert.assertSame(Event.Type.TRACE_COMPLETE, event.type);
-                Assert.assertEquals(resp.getTracingId(), ((Event.TraceComplete) event).traceSessionId);
-            }
-            finally
-            {
-                clientB.close();
-            }
-        }
-        finally
-        {
-            clientA.close();
-        }
-    }
-
-    @Test
-    public void testTraceCompleteVersion3() throws Throwable
-    {
-        sessionNet(3);
-
-        SimpleClient clientA = new SimpleClient(nativeAddr.getHostAddress(), nativePort,
Server.VERSION_3);
-        clientA.connect(false);
-        try
-        {
-            SimpleClient.SimpleEventHandler eventHandlerA = new SimpleClient.SimpleEventHandler();
-            clientA.setEventHandler(eventHandlerA);
-
-            try
-            {
-                clientA.execute(new RegisterMessage(Collections.singletonList(Event.Type.TRACE_COMPLETE)));
-                Assert.fail();
-            }
-            catch (RuntimeException e)
-            {
-                Assert.assertTrue(e.getCause() instanceof ProtocolException); // that's what
we want
-            }
-
-            createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)");
-
-            QueryMessage query = new QueryMessage("SELECT * FROM " + KEYSPACE + '.' + currentTable(),
QueryOptions.DEFAULT);
-            query.setTracingRequested();
-            clientA.execute(query);
-
-            Event event = eventHandlerA.queue.poll(250, TimeUnit.MILLISECONDS);
-            Assert.assertNull(event);
-        }
-        finally
-        {
-            clientA.close();
-        }
-    }
-
-    @Test
-    public void testTraceCompleteNotRegistered() throws Throwable
-    {
-        sessionNet(3);
-
-        SimpleClient clientA = new SimpleClient(nativeAddr.getHostAddress(), nativePort);
-        clientA.connect(false);
-        try
-        {
-            SimpleClient.SimpleEventHandler eventHandlerA = new SimpleClient.SimpleEventHandler();
-            clientA.setEventHandler(eventHandlerA);
-
-            createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)");
-
-            // check that we do NOT receive a trace-complete event, since we didn't register
for that
-
-            // with setTracingRequested()
-            QueryMessage query = new QueryMessage("SELECT * FROM " + KEYSPACE + '.' + currentTable(),
QueryOptions.DEFAULT);
-            query.setTracingRequested();
-            clientA.execute(query);
-            // and without setTracingRequested()
-            query = new QueryMessage("SELECT * FROM " + KEYSPACE + '.' + currentTable(),
QueryOptions.DEFAULT);
-            clientA.execute(query);
-
-            Event event = eventHandlerA.queue.poll(250, TimeUnit.MILLISECONDS);
-            Assert.assertNull(event);
-        }
-        finally
-        {
-            clientA.close();
-        }
-    }
-
-    @Test
-    public void testTraceCompleteWithProbability() throws Throwable
-    {
-        sessionNet(3);
-
-        double traceProbability = StorageService.instance.getTraceProbability();
-        // check for trace-probability in QueryState.traceNextQuery() is x<y, not x<=y
-        StorageService.instance.setTraceProbability(1.1d);
-
-        SimpleClient clientA = new SimpleClient(nativeAddr.getHostAddress(), nativePort);
-        clientA.connect(false);
-        try
-        {
-            SimpleClient.SimpleEventHandler eventHandlerA = new SimpleClient.SimpleEventHandler();
-            clientA.setEventHandler(eventHandlerA);
-
-            SimpleClient clientB = new SimpleClient(nativeAddr.getHostAddress(), nativePort);
-            clientB.connect(false);
-            try
-            {
-                SimpleClient.SimpleEventHandler eventHandlerB = new SimpleClient.SimpleEventHandler();
-                clientB.setEventHandler(eventHandlerB);
-
-                Message.Response resp = clientA.execute(new RegisterMessage(Collections.singletonList(Event.Type.TRACE_COMPLETE)));
-                Assert.assertSame(Message.Type.READY, resp.type);
-
-                createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)");
-
-                QueryMessage query = new QueryMessage("SELECT * FROM " + KEYSPACE + '.' +
currentTable(), QueryOptions.DEFAULT);
-                clientA.execute(query);
-
-                Event event = eventHandlerA.queue.poll(2000, TimeUnit.MILLISECONDS);
-                Assert.assertNull(event);
-
-                Assert.assertNull(eventHandlerB.queue.poll(100, TimeUnit.MILLISECONDS));
-            }
-            finally
-            {
-                clientB.close();
-            }
-        }
-        finally
-        {
-            StorageService.instance.setTraceProbability(traceProbability);
-            clientA.close();
-        }
-    }
-}


Mime
View raw message