cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From stefa...@apache.org
Subject [2/3] cassandra git commit: Fix crossNode value when receiving messages
Date Thu, 03 Nov 2016 08:10:47 GMT
Fix crossNode value when receiving messages

patch by Sylvain Lebresne; reviewed by Stefania Alborghetti for CASSANDRA-12791


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9d9a1a12
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9d9a1a12
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9d9a1a12

Branch: refs/heads/trunk
Commit: 9d9a1a12248eb37affd0cb131c8aa4f658c3bcc9
Parents: d2d7299
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Thu Oct 27 11:23:18 2016 +0800
Committer: Stefania Alborghetti <stefania.alborghetti@datastax.com>
Committed: Thu Nov 3 16:09:34 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/ReadCommandVerbHandler.java    |  4 +-
 .../db/monitoring/ConstructionTime.java         | 41 -----------
 .../cassandra/db/monitoring/Monitorable.java    |  3 +-
 .../db/monitoring/MonitorableImpl.java          | 22 ++++--
 .../cassandra/db/monitoring/MonitoringTask.java | 10 +--
 .../cassandra/net/IncomingTcpConnection.java    |  2 +-
 .../cassandra/net/MessageDeliveryTask.java      |  4 +-
 .../org/apache/cassandra/net/MessageIn.java     | 71 +++++++++++++-------
 .../apache/cassandra/net/MessagingService.java  | 44 ++++++------
 .../apache/cassandra/service/ReadCallback.java  |  3 +-
 .../apache/cassandra/service/StorageProxy.java  |  3 +-
 .../db/monitoring/MonitoringTaskTest.java       | 56 ++++++++-------
 .../org/apache/cassandra/hints/HintTest.java    |  4 +-
 .../cassandra/hints/HintsServiceTest.java       |  3 +-
 .../apache/cassandra/net/MatcherResponse.java   |  2 +-
 .../cassandra/net/MessagingServiceTest.java     | 16 ++---
 .../cassandra/net/MockMessagingServiceTest.java |  6 +-
 .../cassandra/service/DataResolverTest.java     |  3 +-
 19 files changed, 147 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 25facd5..40aace3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Fix crossNode value when receiving messages (CASSANDRA-12791)
  * Don't load MX4J beans twice (CASSANDRA-12869)
  * Extend native protocol request flags, add versions to SUPPORTED, and introduce ProtocolVersion enum (CASSANDRA-12838)
  * Set JOINING mode when running pre-join tasks (CASSANDRA-12836)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index 7948590..a71e92d 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -41,7 +41,7 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
         }
 
         ReadCommand command = message.payload;
-        command.setMonitoringTime(message.constructionTime, message.getTimeout(), message.getSlowQueryTimeout());
+        command.setMonitoringTime(message.constructionTime, message.isCrossNode(), message.getTimeout(), message.getSlowQueryTimeout());
 
         ReadResponse response;
         try (ReadExecutionController executionController = command.executionController();
@@ -53,7 +53,7 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
         if (!command.complete())
         {
             Tracing.trace("Discarding partial response to {} (timed out)", message.from);
-            MessagingService.instance().incrementDroppedMessages(message, System.currentTimeMillis() - message.constructionTime.timestamp);
+            MessagingService.instance().incrementDroppedMessages(message, message.getLifetimeInMS());
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/db/monitoring/ConstructionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/monitoring/ConstructionTime.java b/src/java/org/apache/cassandra/db/monitoring/ConstructionTime.java
deleted file mode 100644
index d6b6078..0000000
--- a/src/java/org/apache/cassandra/db/monitoring/ConstructionTime.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db.monitoring;
-
-public final class ConstructionTime
-{
-    public final long timestamp;
-    public final boolean isCrossNode;
-
-    public ConstructionTime()
-    {
-        this(ApproximateTime.currentTimeMillis());
-    }
-
-    public ConstructionTime(long timestamp)
-    {
-        this(timestamp, false);
-    }
-
-    public ConstructionTime(long timestamp, boolean isCrossNode)
-    {
-        this.timestamp = timestamp;
-        this.isCrossNode = isCrossNode;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/db/monitoring/Monitorable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/monitoring/Monitorable.java b/src/java/org/apache/cassandra/db/monitoring/Monitorable.java
index f4c5ee8..c9bf94e 100644
--- a/src/java/org/apache/cassandra/db/monitoring/Monitorable.java
+++ b/src/java/org/apache/cassandra/db/monitoring/Monitorable.java
@@ -21,7 +21,7 @@ package org.apache.cassandra.db.monitoring;
 public interface Monitorable
 {
     String name();
-    ConstructionTime constructionTime();
+    long constructionTime();
     long timeout();
     long slowTimeout();
 
@@ -29,6 +29,7 @@ public interface Monitorable
     boolean isAborted();
     boolean isCompleted();
     boolean isSlow();
+    boolean isCrossNode();
 
     boolean abort();
     boolean complete();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java b/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java
index 7363e10..48c8152 100644
--- a/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java
+++ b/src/java/org/apache/cassandra/db/monitoring/MonitorableImpl.java
@@ -22,9 +22,10 @@ public abstract class MonitorableImpl implements Monitorable
 {
     private MonitoringState state;
     private boolean isSlow;
-    private ConstructionTime constructionTime;
+    private long constructionTime = -1;
     private long timeout;
     private long slowTimeout;
+    private boolean isCrossNode;
 
     protected MonitorableImpl()
     {
@@ -37,14 +38,16 @@ public abstract class MonitorableImpl implements Monitorable
      * is too complex, it would require passing new parameters to all serializers
      * or specializing the serializers to accept these message properties.
      */
-    public void setMonitoringTime(ConstructionTime constructionTime, long timeout, long slowTimeout)
+    public void setMonitoringTime(long constructionTime, boolean isCrossNode, long timeout, long slowTimeout)
     {
+        assert constructionTime >= 0;
         this.constructionTime = constructionTime;
+        this.isCrossNode = isCrossNode;
         this.timeout = timeout;
         this.slowTimeout = slowTimeout;
     }
 
-    public ConstructionTime constructionTime()
+    public long constructionTime()
     {
         return constructionTime;
     }
@@ -54,6 +57,11 @@ public abstract class MonitorableImpl implements Monitorable
         return timeout;
     }
 
+    public boolean isCrossNode()
+    {
+        return isCrossNode;
+    }
+
     public long slowTimeout()
     {
         return slowTimeout;
@@ -87,7 +95,7 @@ public abstract class MonitorableImpl implements Monitorable
     {
         if (state == MonitoringState.IN_PROGRESS)
         {
-            if (constructionTime != null)
+            if (constructionTime >= 0)
                 MonitoringTask.addFailedOperation(this, ApproximateTime.currentTimeMillis());
 
             state = MonitoringState.ABORTED;
@@ -101,7 +109,7 @@ public abstract class MonitorableImpl implements Monitorable
     {
         if (state == MonitoringState.IN_PROGRESS)
         {
-            if (isSlow && slowTimeout > 0 && constructionTime != null)
+            if (isSlow && slowTimeout > 0 && constructionTime >= 0)
                 MonitoringTask.addSlowOperation(this, ApproximateTime.currentTimeMillis());
 
             state = MonitoringState.COMPLETED;
@@ -113,10 +121,10 @@ public abstract class MonitorableImpl implements Monitorable
 
     private void check()
     {
-        if (constructionTime == null || state != MonitoringState.IN_PROGRESS)
+        if (constructionTime < 0 || state != MonitoringState.IN_PROGRESS)
             return;
 
-        long elapsed = ApproximateTime.currentTimeMillis() - constructionTime.timestamp;
+        long elapsed = ApproximateTime.currentTimeMillis() - constructionTime;
 
         if (elapsed >= slowTimeout && !isSlow)
             isSlow = true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
index b116485..9426042 100644
--- a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
+++ b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
@@ -330,7 +330,7 @@ class MonitoringTask
         {
             this.operation = operation;
             numTimesReported = 1;
-            totalTime = failedAt - operation.constructionTime().timestamp;
+            totalTime = failedAt - operation.constructionTime();
             minTime = totalTime;
             maxTime = totalTime;
         }
@@ -370,7 +370,7 @@ class MonitoringTask
                                      name(),
                                      totalTime,
                                      operation.timeout(),
-                                     operation.constructionTime().isCrossNode ? "msec/cross-node" : "msec");
+                                     operation.isCrossNode() ? "msec/cross-node" : "msec");
             else
                 return String.format("<%s> timed out %d times, avg/min/max %d/%d/%d msec, timeout %d %s",
                                      name(),
@@ -379,7 +379,7 @@ class MonitoringTask
                                      minTime,
                                      maxTime,
                                      operation.timeout(),
-                                     operation.constructionTime().isCrossNode ? "msec/cross-node" : "msec");
+                                     operation.isCrossNode() ? "msec/cross-node" : "msec");
         }
     }
 
@@ -400,7 +400,7 @@ class MonitoringTask
                                      name(),
                                      totalTime,
                                      operation.slowTimeout(),
-                                     operation.constructionTime().isCrossNode ? "msec/cross-node" : "msec");
+                                     operation.isCrossNode() ? "msec/cross-node" : "msec");
             else
                 return String.format("<%s>, was slow %d times: avg/min/max %d/%d/%d msec - slow timeout %d %s",
                                      name(),
@@ -409,7 +409,7 @@ class MonitoringTask
                                      minTime,
                                      maxTime,
                                      operation.slowTimeout(),
-                                     operation.constructionTime().isCrossNode ? "msec/cross-node" : "msec");
+                                     operation.isCrossNode() ? "msec/cross-node" : "msec");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index cd80e00..7d3c607 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -188,7 +188,7 @@ public class IncomingTcpConnection extends FastThreadLocalThread implements Clos
         else
             id = input.readInt();
 
-        MessageIn message = MessageIn.read(input, version, id, MessageIn.readTimestamp(from, input, System.currentTimeMillis()));
+        MessageIn message = MessageIn.read(input, version, id, MessageIn.readConstructionTime(from, input));
         if (message == null)
         {
             // callback expired; nothing to do

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index c97a98f..c91e9da 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -46,7 +46,7 @@ public class MessageDeliveryTask implements Runnable
     public void run()
     {
         MessagingService.Verb verb = message.verb;
-        long timeTaken = System.currentTimeMillis() - message.constructionTime.timestamp;
+        long timeTaken = message.getLifetimeInMS();
         if (MessagingService.DROPPABLE_VERBS.contains(verb)
             && timeTaken > message.getTimeout())
         {
@@ -82,7 +82,7 @@ public class MessageDeliveryTask implements Runnable
         }
 
         if (GOSSIP_VERBS.contains(message.verb))
-            Gossiper.instance.setLastProcessedMessageAt(message.constructionTime.timestamp);
+            Gossiper.instance.setLastProcessedMessageAt(message.constructionTime);
     }
 
     private void handleFailure(Throwable t)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/net/MessageIn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageIn.java b/src/java/org/apache/cassandra/net/MessageIn.java
index 0562df6..a254741 100644
--- a/src/java/org/apache/cassandra/net/MessageIn.java
+++ b/src/java/org/apache/cassandra/net/MessageIn.java
@@ -26,7 +26,7 @@ import com.google.common.collect.ImmutableMap;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.monitoring.ConstructionTime;
+import org.apache.cassandra.db.monitoring.ApproximateTime;
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputBuffer;
@@ -39,14 +39,14 @@ public class MessageIn<T>
     public final Map<String, byte[]> parameters;
     public final MessagingService.Verb verb;
     public final int version;
-    public final ConstructionTime constructionTime;
+    public final long constructionTime;
 
     private MessageIn(InetAddress from,
                       T payload,
                       Map<String, byte[]> parameters,
                       MessagingService.Verb verb,
                       int version,
-                      ConstructionTime constructionTime)
+                      long constructionTime)
     {
         this.from = from;
         this.payload = payload;
@@ -61,17 +61,26 @@ public class MessageIn<T>
                                           Map<String, byte[]> parameters,
                                           MessagingService.Verb verb,
                                           int version,
-                                          ConstructionTime constructionTime)
+                                          long constructionTime)
     {
         return new MessageIn<>(from, payload, parameters, verb, version, constructionTime);
     }
 
+    public static <T> MessageIn<T> create(InetAddress from,
+                                          T payload,
+                                          Map<String, byte[]> parameters,
+                                          MessagingService.Verb verb,
+                                          int version)
+    {
+        return new MessageIn<>(from, payload, parameters, verb, version, ApproximateTime.currentTimeMillis());
+    }
+
     public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int id) throws IOException
     {
-        return read(in, version, id, new ConstructionTime());
+        return read(in, version, id, ApproximateTime.currentTimeMillis());
     }
 
-    public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int id, ConstructionTime constructionTime) throws IOException
+    public static <T2> MessageIn<T2> read(DataInputPlus in, int version, int id, long constructionTime) throws IOException
     {
         InetAddress from = CompactEndpointSerializationHelper.deserialize(in);
 
@@ -115,28 +124,42 @@ public class MessageIn<T>
         return MessageIn.create(from, payload, parameters, verb, version, constructionTime);
     }
 
-    public static ConstructionTime createTimestamp()
+    public static long readConstructionTime(InetAddress from, DataInputPlus input) throws IOException
     {
-        return new ConstructionTime();
+        long currentTime = ApproximateTime.currentTimeMillis();
+
+        // Reconstruct the message construction time sent by the remote host (we sent only the lower 4 bytes, assuming the
+        // higher 4 bytes wouldn't change between the sender and receiver)
+        int partial = input.readInt(); // make sure to readInt, even if cross_node_to is not enabled
+        long sentConstructionTime = (currentTime & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2);
+
+        // Because nodes may not have their clock perfectly in sync, it's actually possible the sentConstructionTime is
+        // later than the currentTime (the received time). If that's the case, as we definitively know there is a lack
+        // of proper synchronziation of the clock, we ignore sentConstructionTime. We also ignore that
+        // sentConstructionTime if we're told to.
+        long elapsed = currentTime - sentConstructionTime;
+        if (elapsed > 0)
+            MessagingService.instance().metrics.addTimeTaken(from, elapsed);
+
+        boolean useSentTime = DatabaseDescriptor.hasCrossNodeTimeout() && elapsed > 0;
+        return useSentTime ? sentConstructionTime : currentTime;
     }
 
-    public static ConstructionTime readTimestamp(InetAddress from, DataInputPlus input, long timestamp) throws IOException
+    /**
+     * Since how long (in milliseconds) the message has lived.
+     */
+    public long getLifetimeInMS()
     {
-        // make sure to readInt, even if cross_node_to is not enabled
-        int partial = input.readInt();
-        long crossNodeTimestamp = (timestamp & 0xFFFFFFFF00000000L) | (((partial & 0xFFFFFFFFL) << 2) >> 2);
-        if (timestamp > crossNodeTimestamp)
-        {
-            MessagingService.instance().metrics.addTimeTaken(from, timestamp - crossNodeTimestamp);
-        }
-        if(DatabaseDescriptor.hasCrossNodeTimeout())
-        {
-            return new ConstructionTime(crossNodeTimestamp, timestamp != crossNodeTimestamp);
-        }
-        else
-        {
-            return new ConstructionTime();
-        }
+        return ApproximateTime.currentTimeMillis() - constructionTime;
+    }
+
+    /**
+     * Whether the message has crossed the node boundary, that is whether it originated from another node.
+     *
+     */
+    public boolean isCrossNode()
+    {
+        return !from.equals(DatabaseDescriptor.getBroadcastAddress());
     }
 
     public Stage getMessageType()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index ba40a58..f82e80b 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -426,14 +426,14 @@ public final class MessagingService implements MessagingServiceMBean
     private static final class DroppedMessages
     {
         final DroppedMessageMetrics metrics;
-        final AtomicInteger droppedInternalTimeout;
-        final AtomicInteger droppedCrossNodeTimeout;
+        final AtomicInteger droppedInternal;
+        final AtomicInteger droppedCrossNode;
 
         DroppedMessages(Verb verb)
         {
             this.metrics = new DroppedMessageMetrics(verb);
-            this.droppedInternalTimeout = new AtomicInteger(0);
-            this.droppedCrossNodeTimeout = new AtomicInteger(0);
+            this.droppedInternal = new AtomicInteger(0);
+            this.droppedCrossNode = new AtomicInteger(0);
         }
 
     }
@@ -1160,19 +1160,19 @@ public final class MessagingService implements MessagingServiceMBean
         {
             updateDroppedMutationCount((IMutation) message.payload);
         }
-        incrementDroppedMessages(message.verb, timeTaken, message.constructionTime.isCrossNode);
+        incrementDroppedMessages(message.verb, timeTaken, message.isCrossNode());
     }
 
-    public void incrementDroppedMessages(Verb verb, long timeTaken, boolean isCrossNodeTimeout)
+    public void incrementDroppedMessages(Verb verb, long timeTaken, boolean isCrossNode)
     {
         assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped";
-        incrementDroppedMessages(droppedMessagesMap.get(verb), timeTaken, isCrossNodeTimeout);
+        incrementDroppedMessages(droppedMessagesMap.get(verb), timeTaken, isCrossNode);
     }
 
-    public void incrementDroppedMessages(Verb verb, boolean isCrossNodeTimeout)
+    public void incrementDroppedMessages(Verb verb, boolean isCrossNode)
     {
         assert DROPPABLE_VERBS.contains(verb) : "Verb " + verb + " should not legally be dropped";
-        incrementDroppedMessages(droppedMessagesMap.get(verb), isCrossNodeTimeout);
+        incrementDroppedMessages(droppedMessagesMap.get(verb), isCrossNode);
     }
 
     private void updateDroppedMutationCount(IMutation mutation)
@@ -1189,22 +1189,22 @@ public final class MessagingService implements MessagingServiceMBean
         }
     }
 
-    private void incrementDroppedMessages(DroppedMessages droppedMessages, long timeTaken, boolean isCrossNodeTimeout)
+    private void incrementDroppedMessages(DroppedMessages droppedMessages, long timeTaken, boolean isCrossNode)
     {
-        if (isCrossNodeTimeout)
+        if (isCrossNode)
             droppedMessages.metrics.crossNodeDroppedLatency.update(timeTaken, TimeUnit.MILLISECONDS);
         else
             droppedMessages.metrics.internalDroppedLatency.update(timeTaken, TimeUnit.MILLISECONDS);
-        incrementDroppedMessages(droppedMessages, isCrossNodeTimeout);
+        incrementDroppedMessages(droppedMessages, isCrossNode);
     }
 
-    private void incrementDroppedMessages(DroppedMessages droppedMessages, boolean isCrossNodeTimeout)
+    private void incrementDroppedMessages(DroppedMessages droppedMessages, boolean isCrossNode)
     {
         droppedMessages.metrics.dropped.mark();
-        if (isCrossNodeTimeout)
-            droppedMessages.droppedCrossNodeTimeout.incrementAndGet();
+        if (isCrossNode)
+            droppedMessages.droppedCrossNode.incrementAndGet();
         else
-            droppedMessages.droppedInternalTimeout.incrementAndGet();
+            droppedMessages.droppedInternal.incrementAndGet();
     }
 
     private void logDroppedMessages()
@@ -1226,16 +1226,16 @@ public final class MessagingService implements MessagingServiceMBean
             Verb verb = entry.getKey();
             DroppedMessages droppedMessages = entry.getValue();
 
-            int droppedInternalTimeout = droppedMessages.droppedInternalTimeout.getAndSet(0);
-            int droppedCrossNodeTimeout = droppedMessages.droppedCrossNodeTimeout.getAndSet(0);
-            if (droppedInternalTimeout > 0 || droppedCrossNodeTimeout > 0)
+            int droppedInternal = droppedMessages.droppedInternal.getAndSet(0);
+            int droppedCrossNode = droppedMessages.droppedCrossNode.getAndSet(0);
+            if (droppedInternal > 0 || droppedCrossNode > 0)
             {
-                ret.add(String.format("%s messages were dropped in last %d ms: %d for internal timeout and %d for cross node timeout."
+                ret.add(String.format("%s messages were dropped in last %d ms: %d internal and %d cross node."
                                      + " Mean internal dropped latency: %d ms and Mean cross-node dropped latency: %d ms",
                                      verb,
                                      LOG_DROPPED_INTERVAL_IN_MS,
-                                     droppedInternalTimeout,
-                                     droppedCrossNodeTimeout,
+                                     droppedInternal,
+                                     droppedCrossNode,
                                      TimeUnit.NANOSECONDS.toMillis((long)droppedMessages.metrics.internalDroppedLatency.getSnapshot().getMean()),
                                      TimeUnit.NANOSECONDS.toMillis((long)droppedMessages.metrics.crossNodeDroppedLatency.getSnapshot().getMean())));
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index b5cb477..11c0b12 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -198,8 +198,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
                                                            result,
                                                            Collections.<String, byte[]>emptyMap(),
                                                            MessagingService.Verb.INTERNAL_RESPONSE,
-                                                           MessagingService.current_version,
-                                                           MessageIn.createTimestamp());
+                                                           MessagingService.current_version);
         response(message);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 529e4e3..e0be68c 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -52,7 +52,6 @@ import org.apache.cassandra.config.SchemaConstants;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
-import org.apache.cassandra.db.monitoring.ConstructionTime;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.view.ViewUtils;
@@ -1876,7 +1875,7 @@ public class StorageProxy implements StorageProxyMBean
         {
             try
             {
-                command.setMonitoringTime(new ConstructionTime(constructionTime), verb.getTimeout(), DatabaseDescriptor.getSlowQueryTimeout());
+                command.setMonitoringTime(constructionTime, false, verb.getTimeout(), DatabaseDescriptor.getSlowQueryTimeout());
 
                 ReadResponse response;
                 try (ReadExecutionController executionController = command.executionController();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/test/unit/org/apache/cassandra/db/monitoring/MonitoringTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/monitoring/MonitoringTaskTest.java b/test/unit/org/apache/cassandra/db/monitoring/MonitoringTaskTest.java
index 14659e3..acc988f 100644
--- a/test/unit/org/apache/cassandra/db/monitoring/MonitoringTaskTest.java
+++ b/test/unit/org/apache/cassandra/db/monitoring/MonitoringTaskTest.java
@@ -65,10 +65,10 @@ public class MonitoringTaskTest
     {
         private final String name;
 
-        TestMonitor(String name, ConstructionTime constructionTime, long timeout, long slow)
+        TestMonitor(String name, long timestamp, boolean isCrossNode, long timeout, long slow)
         {
             this.name = name;
-            setMonitoringTime(constructionTime, timeout, slow);
+            setMonitoringTime(timestamp, isCrossNode, timeout, slow);
         }
 
         public String name()
@@ -124,7 +124,7 @@ public class MonitoringTaskTest
     @Test
     public void testAbort() throws InterruptedException
     {
-        Monitorable operation = new TestMonitor("Test abort", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout);
+        Monitorable operation = new TestMonitor("Test abort", System.currentTimeMillis(), false, timeout, slowTimeout);
         waitForOperationsToComplete(operation);
 
         assertTrue(operation.isAborted());
@@ -135,7 +135,7 @@ public class MonitoringTaskTest
     @Test
     public void testAbortIdemPotent() throws InterruptedException
     {
-        Monitorable operation = new TestMonitor("Test abort", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout);
+        Monitorable operation = new TestMonitor("Test abort", System.currentTimeMillis(), false, timeout, slowTimeout);
         waitForOperationsToComplete(operation);
 
         assertTrue(operation.abort());
@@ -148,7 +148,7 @@ public class MonitoringTaskTest
     @Test
     public void testAbortCrossNode() throws InterruptedException
     {
-        Monitorable operation = new TestMonitor("Test for cross node", new ConstructionTime(System.currentTimeMillis(), true), timeout, slowTimeout);
+        Monitorable operation = new TestMonitor("Test for cross node", System.currentTimeMillis(), true, timeout, slowTimeout);
         waitForOperationsToComplete(operation);
 
         assertTrue(operation.isAborted());
@@ -159,7 +159,7 @@ public class MonitoringTaskTest
     @Test
     public void testComplete() throws InterruptedException
     {
-        Monitorable operation = new TestMonitor("Test complete", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout);
+        Monitorable operation = new TestMonitor("Test complete", System.currentTimeMillis(), false, timeout, slowTimeout);
         operation.complete();
         waitForOperationsToComplete(operation);
 
@@ -171,7 +171,7 @@ public class MonitoringTaskTest
     @Test
     public void testCompleteIdemPotent() throws InterruptedException
     {
-        Monitorable operation = new TestMonitor("Test complete", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout);
+        Monitorable operation = new TestMonitor("Test complete", System.currentTimeMillis(), false, timeout, slowTimeout);
         operation.complete();
         waitForOperationsToComplete(operation);
 
@@ -185,7 +185,7 @@ public class MonitoringTaskTest
     @Test
     public void testReportSlow() throws InterruptedException
     {
-        Monitorable operation = new TestMonitor("Test report slow", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout);
+        Monitorable operation = new TestMonitor("Test report slow", System.currentTimeMillis(), false, timeout, slowTimeout);
         waitForOperationsToBeReportedAsSlow(operation);
 
         assertTrue(operation.isSlow());
@@ -199,7 +199,7 @@ public class MonitoringTaskTest
     public void testNoReportSlowIfZeroSlowTimeout() throws InterruptedException
     {
         // when the slow timeout is set to zero then operation won't be reported as slow
-        Monitorable operation = new TestMonitor("Test report slow disabled", new ConstructionTime(System.currentTimeMillis()), timeout, 0);
+        Monitorable operation = new TestMonitor("Test report slow disabled", System.currentTimeMillis(), false, timeout, 0);
         waitForOperationsToBeReportedAsSlow(operation);
 
         assertTrue(operation.isSlow());
@@ -212,7 +212,7 @@ public class MonitoringTaskTest
     @Test
     public void testReport() throws InterruptedException
     {
-        Monitorable operation = new TestMonitor("Test report", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout);
+        Monitorable operation = new TestMonitor("Test report", System.currentTimeMillis(), false, timeout, slowTimeout);
         waitForOperationsToComplete(operation);
 
         assertTrue(operation.isSlow());
@@ -233,13 +233,13 @@ public class MonitoringTaskTest
         MonitoringTask.instance = MonitoringTask.make(10, -1);
         try
         {
-            Monitorable operation1 = new TestMonitor("Test report 1", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout);
+            Monitorable operation1 = new TestMonitor("Test report 1", System.currentTimeMillis(), false, timeout, slowTimeout);
             waitForOperationsToComplete(operation1);
 
             assertTrue(operation1.isAborted());
             assertFalse(operation1.isCompleted());
 
-            Monitorable operation2 = new TestMonitor("Test report 2", new ConstructionTime(System.currentTimeMillis()), timeout, slowTimeout);
+            Monitorable operation2 = new TestMonitor("Test report 2", System.currentTimeMillis(), false, timeout, slowTimeout);
             waitForOperationsToBeReportedAsSlow(operation2);
 
             operation2.complete();
@@ -266,7 +266,7 @@ public class MonitoringTaskTest
         for (int i = 0; i < opCount; i++)
         {
             executorService.submit(() ->
-                operations.add(new TestMonitor(UUID.randomUUID().toString(), new ConstructionTime(), timeout, slowTimeout))
+                operations.add(new TestMonitor(UUID.randomUUID().toString(), System.currentTimeMillis(), false, timeout, slowTimeout))
             );
         }
 
@@ -311,13 +311,17 @@ public class MonitoringTaskTest
                         for (int j = 0; j < numTimes; j++)
                         {
                             Monitorable operation1 = new TestMonitor(operationName,
-                                                                    new ConstructionTime(System.currentTimeMillis()),
-                                                                    timeout, slowTimeout);
+                                                                     System.currentTimeMillis(),
+                                                                     false,
+                                                                     timeout,
+                                                                     slowTimeout);
                             waitForOperationsToComplete(operation1);
 
                             Monitorable operation2 = new TestMonitor(operationName,
-                                                                     new ConstructionTime(System.currentTimeMillis()),
-                                                                     timeout, slowTimeout);
+                                                                     System.currentTimeMillis(),
+                                                                     false,
+                                                                     timeout,
+                                                                     slowTimeout);
                             waitForOperationsToBeReportedAsSlow(operation2);
                             operation2.complete();
                         }
@@ -362,8 +366,10 @@ public class MonitoringTaskTest
                 try
                 {
                     Monitorable operation = new TestMonitor("Test testMultipleThreadsSameName failed",
-                                                            new ConstructionTime(System.currentTimeMillis()),
-                                                            timeout, slowTimeout);
+                                                            System.currentTimeMillis(),
+                                                            false,
+                                                            timeout,
+                                                            slowTimeout);
                     operations.add(operation);
                 }
                 finally
@@ -394,8 +400,10 @@ public class MonitoringTaskTest
                 try
                 {
                     Monitorable operation = new TestMonitor("Test testMultipleThreadsSameName slow",
-                                                            new ConstructionTime(System.currentTimeMillis()),
-                                                            timeout, slowTimeout);
+                                                            System.currentTimeMillis(),
+                                                            false,
+                                                            timeout,
+                                                            slowTimeout);
                     operations.add(operation);
                 }
                 finally
@@ -428,8 +436,10 @@ public class MonitoringTaskTest
                 try
                 {
                     Monitorable operation = new TestMonitor("Test thread " + Thread.currentThread().getName(),
-                                                            new ConstructionTime(System.currentTimeMillis()),
-                                                            timeout, slowTimeout);
+                                                            System.currentTimeMillis(),
+                                                            false,
+                                                            timeout,
+                                                            slowTimeout);
                     operations.add(operation);
                     operation.complete();
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/test/unit/org/apache/cassandra/hints/HintTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintTest.java b/test/unit/org/apache/cassandra/hints/HintTest.java
index 4cc2188..e4a33fd 100644
--- a/test/unit/org/apache/cassandra/hints/HintTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintTest.java
@@ -232,7 +232,7 @@ public class HintTest
         // Process hint message.
         HintMessage message = new HintMessage(localId, hint);
         MessagingService.instance().getVerbHandler(MessagingService.Verb.HINT).doVerb(
-                MessageIn.create(local, message, Collections.emptyMap(), MessagingService.Verb.HINT, MessagingService.current_version, MessageIn.createTimestamp()),
+                MessageIn.create(local, message, Collections.emptyMap(), MessagingService.Verb.HINT, MessagingService.current_version),
                 -1);
 
         // hint should not be applied as we no longer are a replica
@@ -277,7 +277,7 @@ public class HintTest
             // Process hint message.
             HintMessage message = new HintMessage(localId, hint);
             MessagingService.instance().getVerbHandler(MessagingService.Verb.HINT).doVerb(
-                    MessageIn.create(local, message, Collections.emptyMap(), MessagingService.Verb.HINT, MessagingService.current_version, MessageIn.createTimestamp()),
+                    MessageIn.create(local, message, Collections.emptyMap(), MessagingService.Verb.HINT, MessagingService.current_version),
                     -1);
 
             // hint should not be applied as we no longer are a replica

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
index ffb7f73..077a9d1 100644
--- a/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsServiceTest.java
@@ -184,8 +184,7 @@ public class HintsServiceTest
                 HintResponse.instance,
                 Collections.emptyMap(),
                 MessagingService.Verb.REQUEST_RESPONSE,
-                MessagingService.current_version,
-                MessageIn.createTimestamp());
+                MessagingService.current_version);
 
         MockMessagingSpy spy;
         if (noOfResponses != -1)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/test/unit/org/apache/cassandra/net/MatcherResponse.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MatcherResponse.java b/test/unit/org/apache/cassandra/net/MatcherResponse.java
index c8984eb..21a75c9 100644
--- a/test/unit/org/apache/cassandra/net/MatcherResponse.java
+++ b/test/unit/org/apache/cassandra/net/MatcherResponse.java
@@ -106,7 +106,7 @@ public class MatcherResponse
                     if (payload == null)
                         return null;
                     else
-                        return MessageIn.create(to, payload, Collections.emptyMap(), verb, MessagingService.current_version, MessageIn.createTimestamp());
+                        return MessageIn.create(to, payload, Collections.emptyMap(), verb, MessagingService.current_version);
                 },
                 limit);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
index 2a3ecbe..ec27b7e 100644
--- a/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MessagingServiceTest.java
@@ -82,8 +82,8 @@ public class MessagingServiceTest
 
         List<String> logs = messagingService.getDroppedMessagesLogs();
         assertEquals(1, logs.size());
-        assertEquals("READ messages were dropped in last 5000 ms: 2500 for internal timeout and 2500 for cross node timeout. Mean internal dropped latency: 2730 ms and Mean cross-node dropped latency: 2731 ms", logs.get(0));
-        assertEquals(5000, (int)messagingService.getDroppedMessages().get(verb.toString()));
+        assertEquals("READ messages were dropped in last 5000 ms: 2500 internal and 2500 cross node. Mean internal dropped latency: 2730 ms and Mean cross-node dropped latency: 2731 ms", logs.get(0));
+        assertEquals(5000, (int) messagingService.getDroppedMessages().get(verb.toString()));
 
         logs = messagingService.getDroppedMessagesLogs();
         assertEquals(0, logs.size());
@@ -92,8 +92,8 @@ public class MessagingServiceTest
             messagingService.incrementDroppedMessages(verb, i, i % 2 == 0);
 
         logs = messagingService.getDroppedMessagesLogs();
-        assertEquals("READ messages were dropped in last 5000 ms: 1250 for internal timeout and 1250 for cross node timeout. Mean internal dropped latency: 2277 ms and Mean cross-node dropped latency: 2278 ms", logs.get(0));
-        assertEquals(7500, (int)messagingService.getDroppedMessages().get(verb.toString()));
+        assertEquals("READ messages were dropped in last 5000 ms: 1250 internal and 1250 cross node. Mean internal dropped latency: 2277 ms and Mean cross-node dropped latency: 2278 ms", logs.get(0));
+        assertEquals(7500, (int) messagingService.getDroppedMessages().get(verb.toString()));
     }
 
     @Test
@@ -108,7 +108,7 @@ public class MessagingServiceTest
         long sentAt = now - latency;
 
         assertNull(dcLatency.get("datacenter1"));
-        addDCLatency(sentAt, now);
+        addDCLatency(sentAt);
         assertNotNull(dcLatency.get("datacenter1"));
         assertEquals(1, dcLatency.get("datacenter1").getCount());
         long expectedBucket = bucketOffsets[Math.abs(Arrays.binarySearch(bucketOffsets, TimeUnit.MILLISECONDS.toNanos(latency))) - 1];
@@ -128,7 +128,7 @@ public class MessagingServiceTest
         long sentAt = now - latency;
 
         assertNull(dcLatency.get("datacenter1"));
-        addDCLatency(sentAt, now);
+        addDCLatency(sentAt);
         assertNull(dcLatency.get("datacenter1"));
     }
 
@@ -221,7 +221,7 @@ public class MessagingServiceTest
         assertFalse(MockBackPressureStrategy.applied);
     }
 
-    private static void addDCLatency(long sentAt, long now) throws IOException
+    private static void addDCLatency(long sentAt) throws IOException
     {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         try (DataOutputStreamPlus out = new WrappedDataOutputStreamPlus(baos))
@@ -229,7 +229,7 @@ public class MessagingServiceTest
             out.writeInt((int) sentAt);
         }
         DataInputStreamPlus in = new DataInputStreamPlus(new ByteArrayInputStream(baos.toByteArray()));
-        MessageIn.readTimestamp(InetAddress.getLocalHost(), in, now);
+        MessageIn.readConstructionTime(InetAddress.getLocalHost(), in);
     }
 
     public static class MockBackPressureStrategy implements BackPressureStrategy<MockBackPressureStrategy.MockBackPressureState>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java b/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
index ce94f33..3f6564e 100644
--- a/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
+++ b/test/unit/org/apache/cassandra/net/MockMessagingServiceTest.java
@@ -59,9 +59,7 @@ public class MockMessagingServiceTest
                 EchoMessage.instance,
                 Collections.emptyMap(),
                 MessagingService.Verb.ECHO,
-                MessagingService.current_version,
-                MessageIn.createTimestamp()
-        );
+                MessagingService.current_version);
         MockMessagingSpy spy = MockMessagingService
                 .when(
                         all(
@@ -94,4 +92,4 @@ public class MockMessagingServiceTest
         // and return a mocked response
         assertEquals(1, spy.mockedMessageResponses);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d9a1a12/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index 93415ba..b7624ca 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -828,8 +828,7 @@ public class DataResolverTest
                                 ReadResponse.createRemoteDataResponse(partitionIterator, cmd),
                                 Collections.EMPTY_MAP,
                                 MessagingService.Verb.REQUEST_RESPONSE,
-                                MessagingService.current_version,
-                                MessageIn.createTimestamp());
+                                MessagingService.current_version);
     }
 
     private RangeTombstone tombstone(Object start, Object end, long markedForDeleteAt, int localDeletionTime)


Mime
View raw message