gossip-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecapri...@apache.org
Subject incubator-gossip git commit: GOSSIP-49 Refactor Failure detector Lambda into named class
Date Thu, 16 Mar 2017 19:31:40 GMT
Repository: incubator-gossip
Updated Branches:
  refs/heads/master 4eafd58ec -> 0136dd939


GOSSIP-49 Refactor Failure detector Lambda into named class


Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/0136dd93
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/0136dd93
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/0136dd93

Branch: refs/heads/master
Commit: 0136dd9395b3bddd5a8ba00f34f94d13c088b10d
Parents: 4eafd58
Author: Maxim Rusak <mak-rusak@yandex.ru>
Authored: Thu Mar 16 18:06:49 2017 +0300
Committer: Maxim Rusak <mak-rusak@yandex.ru>
Committed: Thu Mar 16 18:14:04 2017 +0300

----------------------------------------------------------------------
 .../apache/gossip/manager/GossipManager.java    | 109 +++++------------
 .../manager/GossipMemberStateRefresher.java     | 117 +++++++++++++++++++
 .../gossip/manager/PassiveGossipThread.java     |   1 +
 .../handlers/MessageInvokerCombiner.java        |  10 +-
 .../org/apache/gossip/ShutdownDeadtimeTest.java |   1 +
 5 files changed, 150 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/0136dd93/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index ba8517b..d2f5d20 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -19,9 +19,9 @@ package org.apache.gossip.manager;
 
 import com.codahale.metrics.MetricRegistry;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.gossip.Member;
 import org.apache.gossip.GossipSettings;
 import org.apache.gossip.LocalMember;
+import org.apache.gossip.Member;
 import org.apache.gossip.crdt.Crdt;
 import org.apache.gossip.event.GossipListener;
 import org.apache.gossip.event.GossipState;
@@ -29,7 +29,6 @@ import org.apache.gossip.manager.handlers.MessageInvoker;
 import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
 import org.apache.gossip.model.PerNodeDataMessage;
 import org.apache.gossip.model.SharedDataMessage;
-import org.apache.gossip.model.ShutdownMessage;
 import org.apache.log4j.Logger;
 
 import java.lang.reflect.Constructor;
@@ -63,6 +62,7 @@ public abstract class GossipManager {
   private final MetricRegistry registry;
   private final RingStatePersister ringState;
   private final UserDataPersister userDataState;
+  private final GossipMemberStateRefresher memberStateRefresher;
   private final ObjectMapper objectMapper;
 
   private final MessageInvoker messageInvoker;
@@ -73,7 +73,8 @@ public abstract class GossipManager {
                        ObjectMapper objectMapper, MessageInvoker messageInvoker) {
     this.settings = settings;
     this.messageInvoker = messageInvoker;
-    clock = new SystemClock();    
+
+    clock = new SystemClock();
     me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties,
             settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
     gossipCore = new GossipCore(this, registry);
@@ -83,7 +84,7 @@ public abstract class GossipManager {
       if (!startupMember.equals(me)) {
         LocalMember member = new LocalMember(startupMember.getClusterName(),
                 startupMember.getUri(), startupMember.getId(),
-                clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(),

+                clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(),
                 settings.getMinimumSamples(), settings.getDistribution());
         //TODO should members start in down state?
         members.put(member, GossipState.DOWN);
@@ -96,6 +97,7 @@ public abstract class GossipManager {
     this.registry = registry;
     this.ringState = new RingStatePersister(this);
     this.userDataState = new UserDataPersister(this, this.gossipCore);
+    this.memberStateRefresher = new GossipMemberStateRefresher(members, settings, listener,
this::findPerNodeGossipData);
     this.objectMapper = objectMapper;
     readSavedRingState();
     readSavedDataState();
@@ -119,21 +121,21 @@ public abstract class GossipManager {
   public List<LocalMember> getDeadMembers() {
     return Collections.unmodifiableList(
             members.entrySet()
-            .stream()
-            .filter(entry -> GossipState.DOWN.equals(entry.getValue()))
-            .map(Entry::getKey).collect(Collectors.toList()));
+                    .stream()
+                    .filter(entry -> GossipState.DOWN.equals(entry.getValue()))
+                    .map(Entry::getKey).collect(Collectors.toList()));
   }
 
   /**
-   * 
+   *
    * @return a read only list of members found in the UP state
    */
   public List<LocalMember> getLiveMembers() {
     return Collections.unmodifiableList(
             members.entrySet()
-            .stream()
-            .filter(entry -> GossipState.UP.equals(entry.getValue()))
-            .map(Entry::getKey).collect(Collectors.toList()));
+                    .stream()
+                    .filter(entry -> GossipState.UP.equals(entry.getValue()))
+                    .map(Entry::getKey).collect(Collectors.toList()));
   }
 
   public LocalMember getMyself() {
@@ -148,7 +150,7 @@ public abstract class GossipManager {
       throw new RuntimeException(e);
     }
   }
-  
+
   /**
    * Starts the client. Specifically, start the various cycles for this protocol. Start the
gossip
    * thread and start the receiver thread.
@@ -161,77 +163,20 @@ public abstract class GossipManager {
     dataReaper.init();
     scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS);
     scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS);
-    scheduledServiced.scheduleAtFixedRate(() -> {
-      try {
-        for (Entry<LocalMember, GossipState> entry : members.entrySet()) {
-          boolean userDown = processOptomisticShutdown(entry);
-          if (userDown)
-            continue;
-          Double result = null;
-          try {
-            result = entry.getKey().detect(clock.nanoTime());
-            if (result != null) {
-              if (result > settings.getConvictThreshold() && entry.getValue()
== GossipState.UP) {
-                members.put(entry.getKey(), GossipState.DOWN);
-                listener.gossipEvent(entry.getKey(), GossipState.DOWN);
-              }
-              if (result <= settings.getConvictThreshold() && entry.getValue()
== GossipState.DOWN) {
-                members.put(entry.getKey(), GossipState.UP);
-                listener.gossipEvent(entry.getKey(), GossipState.UP);
-              }
-            }
-          } catch (IllegalArgumentException ex) {
-            //0.0 returns throws exception computing the mean. 
-            long now = clock.nanoTime(); 
-            long nowInMillis = TimeUnit.MILLISECONDS.convert(now,TimeUnit.NANOSECONDS);
-            if (nowInMillis - settings.getCleanupInterval() > entry.getKey().getHeartbeat()
&& entry.getValue() == GossipState.UP){
-              LOGGER.warn("Marking down");
-              members.put(entry.getKey(), GossipState.DOWN);
-              listener.gossipEvent(entry.getKey(), GossipState.DOWN);
-            }
-          } //end catch
-        } // end for
-      } catch (RuntimeException ex) {
-        LOGGER.warn("scheduled state had exception", ex);
-      }
-    }, 0, 100, TimeUnit.MILLISECONDS);
+    scheduledServiced.scheduleAtFixedRate(memberStateRefresher, 0, 100, TimeUnit.MILLISECONDS);
     LOGGER.debug("The GossipManager is started.");
   }
 
-  /**
-   * If we have a special key the per-node data that means that the node has sent us 
-   * a pre-emptive shutdown message. We process this so node is seen down sooner
-   * @param l member to consider
-   * @return true if node forced down
-   */
-  public boolean processOptomisticShutdown(Entry<LocalMember, GossipState> l){
-    PerNodeDataMessage m = findPerNodeGossipData(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY);
-    if (m == null){
-      return false;
-    }
-    ShutdownMessage s = (ShutdownMessage) m.getPayload();
-    if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()){
-      if (l.getValue() == GossipState.UP){
-        members.put(l.getKey(), GossipState.DOWN);
-        listener.gossipEvent(l.getKey(), GossipState.DOWN);
-      } else {
-        members.put(l.getKey(), GossipState.DOWN);
-      }
-      return true;
-    }
-    return false;
-  }
-  
   private void readSavedRingState() {
     for (LocalMember l : ringState.readFromDisk()){
       LocalMember member = new LocalMember(l.getClusterName(),
               l.getUri(), l.getId(),
-              clock.nanoTime(), l.getProperties(), settings.getWindowSize(), 
+              clock.nanoTime(), l.getProperties(), settings.getWindowSize(),
               settings.getMinimumSamples(), settings.getDistribution());
       members.putIfAbsent(member, GossipState.DOWN);
     }
   }
-  
+
   private void readSavedDataState() {
     for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> l : userDataState.readPerNodeFromDisk().entrySet()){
       for (Entry<String, PerNodeDataMessage> j : l.getValue().entrySet()){
@@ -274,7 +219,7 @@ public abstract class GossipManager {
     }
     scheduledServiced.shutdownNow();
   }
-  
+
   public void gossipPerNodeData(PerNodeDataMessage message){
     Objects.nonNull(message.getKey());
     Objects.nonNull(message.getTimestamp());
@@ -282,7 +227,7 @@ public abstract class GossipManager {
     message.setNodeId(me.getId());
     gossipCore.addPerNodeData(message);
   }
-  
+
   public void gossipSharedData(SharedDataMessage message){
     Objects.nonNull(message.getKey());
     Objects.nonNull(message.getTimestamp());
@@ -290,7 +235,7 @@ public abstract class GossipManager {
     message.setNodeId(me.getId());
     gossipCore.addSharedData(message);
   }
-  
+
 
   @SuppressWarnings("rawtypes")
   public Crdt findCrdt(String key){
@@ -304,7 +249,7 @@ public abstract class GossipManager {
       return (Crdt) l.getPayload();
     }
   }
-  
+
   @SuppressWarnings("rawtypes")
   public Crdt merge(SharedDataMessage message){
     Objects.nonNull(message.getKey());
@@ -316,7 +261,7 @@ public abstract class GossipManager {
     }
     return gossipCore.merge(message);
   }
-  
+
   public PerNodeDataMessage findPerNodeGossipData(String nodeId, String key){
     ConcurrentHashMap<String, PerNodeDataMessage> j = gossipCore.getPerNodeData().get(nodeId);
     if (j == null){
@@ -332,7 +277,7 @@ public abstract class GossipManager {
       return l;
     }
   }
-  
+
   public SharedDataMessage findSharedGossipData(String key){
     SharedDataMessage l = gossipCore.getSharedData().get(key);
     if (l == null){
@@ -352,11 +297,15 @@ public abstract class GossipManager {
   public RingStatePersister getRingState() {
     return ringState;
   }
-            
+
   public UserDataPersister getUserDataState() {
     return userDataState;
   }
 
+  public GossipMemberStateRefresher getMemberStateRefresher() {
+    return memberStateRefresher;
+  }
+
   public Clock getClock() {
     return clock;
   }
@@ -368,5 +317,5 @@ public abstract class GossipManager {
   public MetricRegistry getRegistry() {
     return registry;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/0136dd93/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java b/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
new file mode 100644
index 0000000..ad2e055
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gossip.manager;
+
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.event.GossipListener;
+import org.apache.gossip.event.GossipState;
+import org.apache.gossip.model.PerNodeDataMessage;
+import org.apache.gossip.model.ShutdownMessage;
+import org.apache.log4j.Logger;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+
+public class GossipMemberStateRefresher implements Runnable {
+  public static final Logger LOGGER = Logger.getLogger(GossipMemberStateRefresher.class);
+
+  private final Map<LocalMember, GossipState> members;
+  private final GossipSettings settings;
+  private final GossipListener listener;
+  private final Clock clock;
+  private final BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData;
+
+  public GossipMemberStateRefresher(Map<LocalMember, GossipState> members, GossipSettings
settings,
+                                    GossipListener listener, BiFunction<String, String,
PerNodeDataMessage> findPerNodeGossipData) {
+    this.members = members;
+    this.settings = settings;
+    this.listener = listener;
+    this.findPerNodeGossipData = findPerNodeGossipData;
+    clock = new SystemClock();
+  }
+
+  public void run() {
+    try {
+      runOnce();
+    } catch (RuntimeException ex) {
+      LOGGER.warn("scheduled state had exception", ex);
+    }
+  }
+
+  public void runOnce() {
+    for (Entry<LocalMember, GossipState> entry : members.entrySet()) {
+      boolean userDown = processOptimisticShutdown(entry);
+      if (userDown)
+        continue;
+      try {
+        Double phiMeasure = entry.getKey().detect(clock.nanoTime());
+        if (phiMeasure != null) {
+          GossipState requiredState = calcRequiredState(phiMeasure);
+          if (entry.getValue() != requiredState) {
+            members.put(entry.getKey(), requiredState);
+            listener.gossipEvent(entry.getKey(), requiredState);
+          }
+        }
+      } catch (IllegalArgumentException ex) {
+        //0.0 returns throws exception computing the mean.
+
+        long now = clock.nanoTime();
+        long nowInMillis = TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
+        if (nowInMillis - settings.getCleanupInterval() > entry.getKey().getHeartbeat()
&& entry.getValue() == GossipState.UP) {
+          LOGGER.warn("Marking down");
+          members.put(entry.getKey(), GossipState.DOWN);
+          listener.gossipEvent(entry.getKey(), GossipState.DOWN);
+        }
+      } //end catch
+    } // end for
+  }
+
+  public GossipState calcRequiredState(Double phiMeasure) {
+    if (phiMeasure > settings.getConvictThreshold())
+      return GossipState.DOWN;
+    else
+      return GossipState.UP;
+  }
+
+  /**
+   * If we have a special key the per-node data that means that the node has sent us
+   * a pre-emptive shutdown message. We process this so node is seen down sooner
+   *
+   * @param l member to consider
+   * @return true if node forced down
+   */
+  public boolean processOptimisticShutdown(Entry<LocalMember, GossipState> l) {
+    PerNodeDataMessage m = findPerNodeGossipData.apply(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY);
+    if (m == null) {
+      return false;
+    }
+    ShutdownMessage s = (ShutdownMessage) m.getPayload();
+    if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()) {
+      members.put(l.getKey(), GossipState.DOWN);
+      if (l.getValue() == GossipState.UP) {
+        listener.gossipEvent(l.getKey(), GossipState.DOWN);
+      }
+      return true;
+    }
+    return false;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/0136dd93/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
index bfce2dd..ae28bf7 100644
--- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java
@@ -85,6 +85,7 @@ abstract public class PassiveGossipThread implements Runnable {
             gossipCore.receive(activeGossipMessage);
             unsigned.mark();
           }
+          gossipManager.getMemberStateRefresher().run();
         } catch (RuntimeException ex) {//TODO trap json exception
           LOGGER.error("Unable to process message", ex);
         }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/0136dd93/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java
b/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java
index 964da86..cc6ef52 100644
--- a/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java
+++ b/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java
@@ -24,6 +24,7 @@ import org.apache.gossip.model.Base;
 
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Predicate;
 
 public class MessageInvokerCombiner implements MessageInvoker {
   private final List<MessageInvoker> invokers = new CopyOnWriteArrayList<>();
@@ -32,14 +33,7 @@ public class MessageInvokerCombiner implements MessageInvoker {
   }
 
   public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
-    if (invokers == null) {
-      return false;
-    }
-    boolean result = false;
-    for (MessageInvoker mi : invokers) {
-      result = mi.invoke(gossipCore, gossipManager, base) || result;
-    }
-    return result;
+    return invokers.stream().filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count()
> 0;
   }
 
   public void clear() {

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/0136dd93/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
index 48fb2cb..a91480e 100644
--- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
@@ -81,6 +81,7 @@ public class ShutdownDeadtimeTest {
         return total;
       }
     }).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20);
+
     // shutdown one client and verify that one client is lost.
     Random r = new Random();
     int randomClientId = r.nextInt(clusterMembers);


Mime
View raw message