cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [1/2] cassandra git commit: Expose phi values from failure detector via JMX
Date Mon, 19 Oct 2015 14:57:14 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 cf0c61b8e -> 7159a754a


Expose phi values from failure detector via JMX

patch by Ron Kuris and Ariel Weisberg; reviewed by Ariel Weisberg and
Brandon Williams for CASSANDRA-9526


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3cd75001
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3cd75001
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3cd75001

Branch: refs/heads/cassandra-3.0
Commit: 3cd750012e0cf3b55442c036627fb032c29c16bc
Parents: 0bfa26d
Author: Ron Kuris <ron.kuris@gmail.com>
Authored: Mon Oct 19 15:50:58 2015 +0100
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Mon Oct 19 15:50:58 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/cassandra/gms/FailureDetector.java   | 82 +++++++++++++++-----
 .../cassandra/gms/FailureDetectorMBean.java     |  5 ++
 .../org/apache/cassandra/tools/NodeProbe.java   | 15 +++-
 .../org/apache/cassandra/tools/NodeTool.java    |  3 +-
 .../tools/nodetool/FailureDetectorInfo.java     | 46 +++++++++++
 6 files changed, 132 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cd75001/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0904559..3eff22c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 2.2.4
+ * Expose phi values from failure detector via JMX and tweak debug
+   and trace logging (CASSANDRA-9526)
  * Fix RangeNamesQueryPager (CASSANDRA-10509)
  * Deprecate Pig support (CASSANDRA-10542)
  * Reduce contention getting instances of CompositeType (CASSANDRA-10433)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cd75001/src/java/org/apache/cassandra/gms/FailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java
index 861a853..c563872 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -22,10 +22,13 @@ import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,6 +49,7 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
     public static final String MBEAN_NAME = "org.apache.cassandra.net:type=FailureDetector";
     private static final int SAMPLE_SIZE = 1000;
     protected static final long INITIAL_VALUE_NANOS = TimeUnit.NANOSECONDS.convert(getInitialValue(),
TimeUnit.MILLISECONDS);
+    private static final int DEBUG_PERCENTAGE = 80; // if the phi is larger than this percentage
of the max, log a debug message
     private static final long DEFAULT_MAX_PAUSE = 5000L * 1000000L; // 5 seconds
     private static final long MAX_LOCAL_PAUSE_IN_NANOS = getMaxLocalPause();
     private long lastInterpret = System.nanoTime();
@@ -71,8 +75,8 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
     // change.
     private final double PHI_FACTOR = 1.0 / Math.log(10.0); // 0.434...
 
-    private final Map<InetAddress, ArrivalWindow> arrivalSamples = new Hashtable<InetAddress,
ArrivalWindow>();
-    private final List<IFailureDetectionEventListener> fdEvntListeners = new CopyOnWriteArrayList<IFailureDetectionEventListener>();
+    private final ConcurrentHashMap<InetAddress, ArrivalWindow> arrivalSamples = new
ConcurrentHashMap<>();
+    private final List<IFailureDetectionEventListener> fdEvntListeners = new CopyOnWriteArrayList<>();
 
     public FailureDetector()
     {
@@ -148,6 +152,34 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
         return count;
     }
 
+    @Override
+    public TabularData getPhiValues() throws OpenDataException
+    {
+        final CompositeType ct = new CompositeType("Node", "Node",
+                new String[]{"Endpoint", "PHI"},
+                new String[]{"IP of the endpoint", "PHI value"},
+                new OpenType[]{SimpleType.STRING, SimpleType.DOUBLE});
+        final TabularDataSupport results = new TabularDataSupport(new TabularType("PhiList",
"PhiList", ct, new String[]{"Endpoint"}));
+
+        for (final Map.Entry<InetAddress, ArrivalWindow> entry : arrivalSamples.entrySet())
+        {
+            final ArrivalWindow window = entry.getValue();
+            if (window.mean() > 0)
+            {
+                final double phi = window.getLastReportedPhi();
+                if (phi != Double.MIN_VALUE)
+                {
+                    // returned values are scaled by PHI_FACTOR so that the are on the same
scale as PhiConvictThreshold
+                    final CompositeData data = new CompositeDataSupport(ct,
+                            new String[]{"Endpoint", "PHI"},
+                            new Object[]{entry.getKey().toString(), phi * PHI_FACTOR});
+                    results.put(data);
+                }
+            }
+        }
+        return results;
+    }
+
     public String getEndpointState(String address) throws UnknownHostException
     {
         StringBuilder sb = new StringBuilder();
@@ -219,8 +251,6 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
 
     public void report(InetAddress ep)
     {
-        if (logger.isTraceEnabled())
-            logger.trace("reporting {}", ep);
         long now = System.nanoTime();
         ArrivalWindow heartbeatWindow = arrivalSamples.get(ep);
         if (heartbeatWindow == null)
@@ -228,12 +258,17 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
             // avoid adding an empty ArrivalWindow to the Map
             heartbeatWindow = new ArrivalWindow(SAMPLE_SIZE);
             heartbeatWindow.add(now, ep);
-            arrivalSamples.put(ep, heartbeatWindow);
+            heartbeatWindow = arrivalSamples.putIfAbsent(ep, heartbeatWindow);
+            if (heartbeatWindow != null)
+                heartbeatWindow.add(now, ep);
         }
         else
         {
             heartbeatWindow.add(now, ep);
         }
+
+        if (logger.isTraceEnabled() && heartbeatWindow != null)
+            logger.info("Average for {} is {}", ep, heartbeatWindow.mean());
     }
 
     public void interpret(InetAddress ep)
@@ -263,13 +298,22 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
 
         if (PHI_FACTOR * phi > getPhiConvictThreshold())
         {
-            logger.trace("notifying listeners that {} is down", ep);
-            logger.trace("intervals: {} mean: {}", hbWnd, hbWnd.mean());
+            if (logger.isTraceEnabled())
+                logger.trace("Node {} phi {} > {}; intervals: {} mean: {}", new Object[]{ep,
PHI_FACTOR * phi, getPhiConvictThreshold(), hbWnd, hbWnd.mean()});
             for (IFailureDetectionEventListener listener : fdEvntListeners)
             {
                 listener.convict(ep, phi);
             }
         }
+        else if (logger.isDebugEnabled() && (PHI_FACTOR * phi * DEBUG_PERCENTAGE
/ 100.0 > getPhiConvictThreshold()))
+        {
+            logger.debug("PHI for {} : {}", ep, phi);
+        }
+        else if (logger.isTraceEnabled())
+        {
+            logger.trace("PHI for {} : {}", ep, phi);
+            logger.trace("mean for {} : {}", ep, hbWnd.mean());
+        }
     }
 
     public void forceConviction(InetAddress ep)
@@ -312,10 +356,6 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
         sb.append("-----------------------------------------------------------------------");
         return sb.toString();
     }
-
-    public static void main(String[] args)
-    {
-    }
 }
 
 /*
@@ -372,12 +412,7 @@ class ArrivalWindow
     private static final Logger logger = LoggerFactory.getLogger(ArrivalWindow.class);
     private long tLast = 0L;
     private final ArrayBackedBoundedStats arrivalIntervals;
-
-    // this is useless except to provide backwards compatibility in phi_convict_threshold,
-    // because everyone seems pretty accustomed to the default of 8, and users who have
-    // already tuned their phi_convict_threshold for their own environments won't need to
-    // change.
-    private final double PHI_FACTOR = 1.0 / Math.log(10.0);
+    private double lastReportedPhi = Double.MIN_VALUE;
 
     // in the event of a long partition, never record an interval longer than the rpc timeout,
     // since if a host is regularly experiencing connectivity problems lasting this long
we'd
@@ -411,9 +446,14 @@ class ArrivalWindow
         {
             long interArrivalTime = (value - tLast);
             if (interArrivalTime <= MAX_INTERVAL_IN_NANO)
+            {
                 arrivalIntervals.add(interArrivalTime);
+                logger.trace("Reporting interval time of {} for {}", interArrivalTime, ep);
+            }
             else
+            {
                 logger.debug("Ignoring interval time of {} for {}", interArrivalTime, ep);
+            }
         }
         else
         {
@@ -435,7 +475,13 @@ class ArrivalWindow
     {
         assert arrivalIntervals.mean() > 0 && tLast > 0; // should not be called
before any samples arrive
         long t = tnow - tLast;
-        return t / mean();
+        lastReportedPhi = t / mean();
+        return lastReportedPhi;
+    }
+
+    double getLastReportedPhi()
+    {
+        return lastReportedPhi;
     }
 
     public String toString()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cd75001/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java b/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java
index 45250b4..23fae3a 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetectorMBean.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.gms;
 import java.net.UnknownHostException;
 import java.util.Map;
 
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.TabularData;
+
 public interface FailureDetectorMBean
 {
     public void dumpInterArrivalTimes();
@@ -37,4 +40,6 @@ public interface FailureDetectorMBean
     public int getDownEndpointCount();
 
     public int getUpEndpointCount();
+
+    public TabularData getPhiValues() throws OpenDataException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cd75001/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index da403ab..62795b5 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -27,7 +27,6 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.rmi.server.RMIClientSocketFactory;
 import java.rmi.server.RMISocketFactory;
-import java.text.SimpleDateFormat;
 import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -522,7 +521,7 @@ public class NodeProbe implements AutoCloseable
 
     /**
      * Take a snapshot of all column family from different keyspaces.
-     * 
+     *
      * @param snapshotName
      *            the name of the snapshot.
      * @param columnFamilyList
@@ -1277,6 +1276,18 @@ public class NodeProbe implements AutoCloseable
             }
         }
     }
+
+    public TabularData getFailureDetectorPhilValues()
+    {
+        try
+        {
+            return fdProxy.getPhiValues();
+        }
+        catch (OpenDataException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
 }
 
 class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, ColumnFamilyStoreMBean>>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cd75001/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index da3560d..175b325 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -129,7 +129,8 @@ public class NodeTool
                 TpStats.class,
                 TopPartitions.class,
                 SetLoggingLevel.class,
-                GetLoggingLevels.class
+                GetLoggingLevels.class,
+                FailureDetectorInfo.class
         );
 
         Cli.CliBuilder<Runnable> builder = Cli.builder("nodetool");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3cd75001/src/java/org/apache/cassandra/tools/nodetool/FailureDetectorInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/FailureDetectorInfo.java b/src/java/org/apache/cassandra/tools/nodetool/FailureDetectorInfo.java
new file mode 100644
index 0000000..72c109a
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/FailureDetectorInfo.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools.nodetool;
+
+import io.airlift.command.Command;
+
+import java.util.List;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+
+@Command(name = "failuredetector", description = "Shows the failure detector information
for the cluster")
+public class FailureDetectorInfo extends NodeToolCmd
+{
+    @Override
+    public void execute(NodeProbe probe)
+    {
+        TabularData data = probe.getFailureDetectorPhilValues();
+        System.out.printf("%10s,%16s\n", "Endpoint", "Phi");
+        for (Object o : data.keySet())
+        {
+            @SuppressWarnings({ "rawtypes", "unchecked" })
+            CompositeData datum = data.get(((List) o).toArray(new Object[((List) o).size()]));
+            System.out.printf("%10s,%16.8f\n",datum.get("Endpoint"), datum.get("PHI"));
+        }
+    }
+}
+


Mime
View raw message