gossip-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecapri...@apache.org
Subject incubator-gossip git commit: GOSSIP-36 Persist ring state
Date Fri, 03 Feb 2017 05:31:52 GMT
Repository: incubator-gossip
Updated Branches:
  refs/heads/master 3f1882fbc -> 32c082a0c


GOSSIP-36 Persist ring state


Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/32c082a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/32c082a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/32c082a0

Branch: refs/heads/master
Commit: 32c082a0cb9a2717e0beb5ae2b52cfc6b1717745
Parents: 3f1882f
Author: Edward Capriolo <edlinuxguru@gmail.com>
Authored: Sat Jan 28 16:32:29 2017 -0500
Committer: Edward Capriolo <edlinuxguru@gmail.com>
Committed: Tue Jan 31 23:38:33 2017 -0500

----------------------------------------------------------------------
 .../java/org/apache/gossip/GossipMember.java    |  15 +--
 .../java/org/apache/gossip/GossipSettings.java  |  42 ++++++-
 .../org/apache/gossip/LocalGossipMember.java    |   6 +-
 .../java/org/apache/gossip/StartupSettings.java |   2 +-
 .../examples/StandAloneDatacenterAndRack.java   |  61 ++++++++++
 .../DatacenterRackAwareActiveGossiper.java      |   3 +-
 .../apache/gossip/manager/GossipManager.java    |  63 ++++++++---
 .../gossip/manager/RingStatePersister.java      |  81 +++++++++++++
 .../gossip/manager/UserDataPersister.java       | 111 ++++++++++++++++++
 src/test/java/org/apache/gossip/DataTest.java   |   2 +
 .../org/apache/gossip/IdAndPropertyTest.java    |   5 +-
 .../org/apache/gossip/ShutdownDeadtimeTest.java |   4 +-
 .../org/apache/gossip/TenNodeThreeSeedTest.java |   2 +
 .../apache/gossip/manager/DataReaperTest.java   |   2 +
 .../gossip/manager/RingPersistenceTest.java     |  63 +++++++++++
 .../gossip/manager/UserDataPersistenceTest.java | 113 +++++++++++++++++++
 16 files changed, 544 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/32c082a0/src/main/java/org/apache/gossip/GossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipMember.java b/src/main/java/org/apache/gossip/GossipMember.java
index f2834bd..703ac55 100644
--- a/src/main/java/org/apache/gossip/GossipMember.java
+++ b/src/main/java/org/apache/gossip/GossipMember.java
@@ -28,11 +28,11 @@ import java.util.Map;
 public abstract class GossipMember implements Comparable<GossipMember> {
 
   
-  protected final URI uri;
+  protected URI uri;
 
   protected volatile long heartbeat;
 
-  protected final String clusterName;
+  protected String clusterName;
 
   /**
    * The purpose of the id field is to be able for nodes to identify themselves beyond their
@@ -64,6 +64,7 @@ public abstract class GossipMember implements Comparable<GossipMember>
{
     this.properties = properties;
   }
 
+  protected GossipMember(){}
   /**
    * Get the name of the cluster the member belongs to.
    * 
@@ -78,7 +79,7 @@ public abstract class GossipMember implements Comparable<GossipMember>
{
    * @return The member address in the form IP/host:port Similar to the toString in
    * {@link InetSocketAddress}
    */
-  public String getAddress() {
+  public String computeAddress() {
     return uri.getHost() + ":" + uri.getPort();
   }
 
@@ -118,7 +119,7 @@ public abstract class GossipMember implements Comparable<GossipMember>
{
   }
 
   public String toString() {
-    return "Member [address=" + getAddress() + ", id=" + id + ", heartbeat=" + heartbeat
+ "]";
+    return "Member [address=" + computeAddress() + ", id=" + id + ", heartbeat=" + heartbeat
+ "]";
   }
 
   /**
@@ -128,7 +129,7 @@ public abstract class GossipMember implements Comparable<GossipMember>
{
   public int hashCode() {
     final int prime = 31;
     int result = 1;
-    String address = getAddress();
+    String address = computeAddress();
     result = prime * result + ((address == null) ? 0 : address.hashCode()) + (clusterName
== null ? 0
             : clusterName.hashCode());
     return result;
@@ -155,11 +156,11 @@ public abstract class GossipMember implements Comparable<GossipMember>
{
       return false;
     }
     // The object is the same of they both have the same address (hostname and port).
-    return getAddress().equals(((LocalGossipMember) obj).getAddress())
+    return computeAddress().equals(((LocalGossipMember) obj).computeAddress())
             && getClusterName().equals(((LocalGossipMember) obj).getClusterName());
   }
 
   public int compareTo(GossipMember other) {
-    return this.getAddress().compareTo(other.getAddress());
+    return this.computeAddress().compareTo(other.computeAddress());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/32c082a0/src/main/java/org/apache/gossip/GossipSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipSettings.java b/src/main/java/org/apache/gossip/GossipSettings.java
index 1fed914..60a443f 100644
--- a/src/main/java/org/apache/gossip/GossipSettings.java
+++ b/src/main/java/org/apache/gossip/GossipSettings.java
@@ -40,7 +40,7 @@ public class GossipSettings {
   
   /** the threshold for the detector */
   //private double convictThreshold = 2.606201185901408;
-  private double convictThreshold = 4.5;
+  private double convictThreshold = 2.606201185901408;
   
   private String distribution = "exponential";
   
@@ -48,6 +48,14 @@ public class GossipSettings {
   
   private Map<String,String> activeGossipProperties = new HashMap<>();
   
+  private String pathToRingState = "./";
+  
+  private boolean persistRingState = true;
+  
+  private String pathToDataState = "./";
+  
+  private boolean persistDataState = true;
+  
   /**
    * Construct GossipSettings with default settings.
    */
@@ -162,5 +170,37 @@ public class GossipSettings {
   public void setActiveGossipProperties(Map<String, String> activeGossipProperties)
{
     this.activeGossipProperties = activeGossipProperties;
   }
+
+  public String getPathToRingState() {
+    return pathToRingState;
+  }
+
+  public void setPathToRingState(String pathToRingState) {
+    this.pathToRingState = pathToRingState;
+  }
+
+  public boolean isPersistRingState() {
+    return persistRingState;
+  }
+
+  public void setPersistRingState(boolean persistRingState) {
+    this.persistRingState = persistRingState;
+  }
+
+  public String getPathToDataState() {
+    return pathToDataState;
+  }
+
+  public void setPathToDataState(String pathToDataState) {
+    this.pathToDataState = pathToDataState;
+  }
+
+  public boolean isPersistDataState() {
+    return persistDataState;
+  }
+
+  public void setPersistDataState(boolean persistDataState) {
+    this.persistDataState = persistDataState;
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/32c082a0/src/main/java/org/apache/gossip/LocalGossipMember.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/LocalGossipMember.java b/src/main/java/org/apache/gossip/LocalGossipMember.java
index 557ffcb..05874f5 100644
--- a/src/main/java/org/apache/gossip/LocalGossipMember.java
+++ b/src/main/java/org/apache/gossip/LocalGossipMember.java
@@ -29,7 +29,7 @@ import org.apache.gossip.accrual.FailureDetector;
  */
 public class LocalGossipMember extends GossipMember {
   /** The failure detector for this member */
-  private transient final FailureDetector detector;
+  private transient FailureDetector detector;
 
   /**
    * 
@@ -46,6 +46,10 @@ public class LocalGossipMember extends GossipMember {
     detector = new FailureDetector(this, minSamples, windowSize, distribution);
   }
 
+  protected LocalGossipMember(){
+    
+  }
+  
   public void recordHeartbeat(long now){
     detector.recordHeartbeat(now);
   }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/32c082a0/src/main/java/org/apache/gossip/StartupSettings.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/StartupSettings.java b/src/main/java/org/apache/gossip/StartupSettings.java
index 0117be7..ab5f764 100644
--- a/src/main/java/org/apache/gossip/StartupSettings.java
+++ b/src/main/java/org/apache/gossip/StartupSettings.java
@@ -198,7 +198,7 @@ public class StartupSettings {
       RemoteGossipMember member = new RemoteGossipMember(child.get("cluster").asText(),
               uri3, "", 0, new HashMap<String,String>());
       settings.addGossipMember(member);
-      configMembersDetails += member.getAddress();
+      configMembersDetails += member.computeAddress();
       configMembersDetails += ", ";
     }
     log.info(configMembersDetails + "]");

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/32c082a0/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java b/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
new file mode 100644
index 0000000..dfeabd7
--- /dev/null
+++ b/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gossip.examples;
+
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.gossip.GossipService;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.RemoteGossipMember;
+import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper;
+
+import com.codahale.metrics.MetricRegistry;
+
+public class StandAloneDatacenterAndRack {
+
+  public static void main (String [] args) throws UnknownHostException, InterruptedException
{
+    GossipSettings s = new GossipSettings();
+    s.setWindowSize(10);
+    s.setConvictThreshold(1.0);
+    s.setGossipInterval(1000);
+    s.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName());
+    Map<String, String> gossipProps = new HashMap<>();
+    gossipProps.put("sameRackGossipIntervalMs", "2000");
+    gossipProps.put("differentDatacenterGossipIntervalMs", "10000");
+    s.setActiveGossipProperties(gossipProps);
+    
+    
+    Map<String, String> props = new HashMap<>();
+    props.put(DatacenterRackAwareActiveGossiper.DATACENTER, args[4]);
+    props.put(DatacenterRackAwareActiveGossiper.RACK, args[5]);
+    GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1],
+            props, Arrays.asList(new RemoteGossipMember("mycluster", URI.create(args[2]),
args[3])),
+            s, (a, b) -> { }, new MetricRegistry());
+    gossipService.start();
+    while (true){
+      System.out.println("Live: " + gossipService.getGossipManager().getLiveMembers());
+      System.out.println("Dead: " + gossipService.getGossipManager().getDeadMembers());
+      Thread.sleep(2000);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/32c082a0/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
index 40b9c28..4f5dfdc 100644
--- a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
+++ b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java
@@ -180,7 +180,8 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper
{
   }
 
   private void sendToSameRackMember() {
-    sendMembershipList(gossipManager.getMyself(), selectPartner(sameRackNodes()));
+    LocalGossipMember i = selectPartner(sameRackNodes());
+    sendMembershipList(gossipManager.getMyself(), i);
   }
   
   private void sendToSameRackMemberPerNode() {

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/32c082a0/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 840efb9..04afc28 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -54,35 +54,24 @@ public abstract class GossipManager {
   public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
 
   private final ConcurrentSkipListMap<LocalGossipMember, GossipState> members;
-
   private final LocalGossipMember me;
-
   private final GossipSettings settings;
-
   private final AtomicBoolean gossipServiceRunning;
-
   private final GossipListener listener;
-
   private AbstractActiveGossiper activeGossipThread;
-
   private PassiveGossipThread passiveGossipThread;
-
   private ExecutorService gossipThreadExecutor;
-  
   private final GossipCore gossipCore;
-  
   private final DataReaper dataReaper;
-  
   private final Clock clock;
-  
   private final ScheduledExecutorService scheduledServiced;
-
-  private MetricRegistry registry;
-
+  private final MetricRegistry registry;
+  private final RingStatePersister ringState;
+  private final UserDataPersister userDataState;
+  
   public GossipManager(String cluster,
           URI uri, String id, Map<String,String> properties, GossipSettings settings,
           List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry
registry) {
-    
     this.settings = settings;
     gossipCore = new GossipCore(this, registry);
     clock = new SystemClock();
@@ -105,6 +94,10 @@ public abstract class GossipManager {
     this.listener = listener;
     this.scheduledServiced = Executors.newScheduledThreadPool(1);
     this.registry = registry;
+    this.ringState = new RingStatePersister(this);
+    this.userDataState = new UserDataPersister(this, this.gossipCore);
+    readSavedRingState();
+    readSavedDataState();
   }
 
   public ConcurrentSkipListMap<LocalGossipMember, GossipState> getMembers() {
@@ -150,6 +143,7 @@ public abstract class GossipManager {
       throw new RuntimeException(e);
     }
   }
+  
   /**
    * Starts the client. Specifically, start the various cycles for this protocol. Start the
gossip
    * thread and start the receiver thread.
@@ -160,13 +154,14 @@ public abstract class GossipManager {
     activeGossipThread = constructActiveGossiper();
     activeGossipThread.init();
     dataReaper.init();
+    scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS);
+    scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS);
     scheduledServiced.scheduleAtFixedRate(() -> {
       try {
         for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
           Double result = null;
           try {
             result = entry.getKey().detect(clock.nanoTime());
-            //System.out.println(entry.getKey() +" "+ result);
             if (result != null) {
               if (result > settings.getConvictThreshold() && entry.getValue()
== GossipState.UP) {
                 members.put(entry.getKey(), GossipState.DOWN);
@@ -195,6 +190,27 @@ public abstract class GossipManager {
     LOGGER.debug("The GossipManager is started.");
   }
 
+  private void readSavedRingState() {
+    for (LocalGossipMember l : ringState.readFromDisk()){
+      LocalGossipMember member = new LocalGossipMember(l.getClusterName(),
+              l.getUri(), l.getId(),
+              clock.nanoTime(), l.getProperties(), settings.getWindowSize(), 
+              settings.getMinimumSamples(), settings.getDistribution());
+      members.putIfAbsent(member, GossipState.DOWN);
+    }
+  }
+  
+  private void readSavedDataState() {
+    for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> l : userDataState.readPerNodeFromDisk().entrySet()){
+      for (Entry<String, GossipDataMessage> j : l.getValue().entrySet()){
+        gossipCore.addPerNodeData(j.getValue());
+      }
+    }
+    for (Entry<String, SharedGossipDataMessage> l: userDataState.readSharedDataFromDisk().entrySet()){
+      gossipCore.addSharedData(l.getValue());
+    }
+  }
+
   /**
    * Shutdown the gossip service.
    */
@@ -217,6 +233,14 @@ public abstract class GossipManager {
     } catch (InterruptedException e) {
       LOGGER.error(e);
     }
+    gossipThreadExecutor.shutdownNow();
+    scheduledServiced.shutdown();
+    try {
+      scheduledServiced.awaitTermination(1, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      LOGGER.error(e);
+    }
+    scheduledServiced.shutdownNow();
   }
   
   public void gossipPerNodeData(GossipDataMessage message){
@@ -266,6 +290,13 @@ public abstract class GossipManager {
   public DataReaper getDataReaper() {
     return dataReaper;
   }
+
+  public RingStatePersister getRingState() {
+    return ringState;
+  }
             
+  public UserDataPersister getUserDataState() {
+    return userDataState;
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/32c082a0/src/main/java/org/apache/gossip/manager/RingStatePersister.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/RingStatePersister.java b/src/main/java/org/apache/gossip/manager/RingStatePersister.java
new file mode 100644
index 0000000..24b464a
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/RingStatePersister.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */ 
+package org.apache.gossip.manager;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.NavigableSet;
+
+import org.apache.gossip.LocalGossipMember;
+import org.apache.log4j.Logger;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class RingStatePersister implements Runnable {
+
+  private static final Logger LOGGER = Logger.getLogger(RingStatePersister.class);
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  private static final TypeReference<ArrayList<LocalGossipMember>> REF 
+    = new TypeReference<ArrayList<LocalGossipMember>>() { };
+  private GossipManager parent;
+  
+  public RingStatePersister(GossipManager parent){
+    this.parent = parent;
+  }
+  
+  @Override
+  public void run() {
+    writeToDisk();
+  }
+  
+  File computeTarget(){
+    return new File(parent.getSettings().getPathToRingState(), "ringstate." + parent.getMyself().getClusterName()
+ "." 
+            + parent.getMyself().getId() + ".json");
+  }
+  
+  void writeToDisk(){
+    if (!parent.getSettings().isPersistRingState()){
+      return;
+    }
+    NavigableSet<LocalGossipMember> i = parent.getMembers().keySet();
+    try (FileOutputStream fos = new FileOutputStream(computeTarget())){
+      MAPPER.writeValue(fos, i);
+    } catch (IOException e) {
+      LOGGER.debug(e);
+    }
+  }
+
+  List<LocalGossipMember> readFromDisk(){
+    if (!parent.getSettings().isPersistRingState()){
+      return Collections.emptyList();
+    }
+    try (FileInputStream fos = new FileInputStream(computeTarget())){
+      return MAPPER.readValue(fos, REF);
+    } catch (IOException e) {
+      LOGGER.debug(e);
+    }
+    return Collections.emptyList();
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/32c082a0/src/main/java/org/apache/gossip/manager/UserDataPersister.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/UserDataPersister.java b/src/main/java/org/apache/gossip/manager/UserDataPersister.java
new file mode 100644
index 0000000..c67677a
--- /dev/null
+++ b/src/main/java/org/apache/gossip/manager/UserDataPersister.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */ 
+package org.apache.gossip.manager;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.SharedGossipDataMessage;
+import org.apache.log4j.Logger;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class UserDataPersister implements Runnable {
+  
+  private static final Logger LOGGER = Logger.getLogger(UserDataPersister.class);
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  private final GossipManager parent;
+  private final GossipCore gossipCore; 
+  
+  UserDataPersister(GossipManager parent, GossipCore gossipCore){
+    this.parent = parent;
+    this.gossipCore = gossipCore;
+    MAPPER.enableDefaultTyping();
+  }
+  
+  File computeSharedTarget(){
+    return new File(parent.getSettings().getPathToDataState(), "shareddata."
+            + parent.getMyself().getClusterName() + "." + parent.getMyself().getId() + ".json");
+  }
+  
+  File computePerNodeTarget() {
+    return new File(parent.getSettings().getPathToDataState(), "pernodedata."
+            + parent.getMyself().getClusterName() + "." + parent.getMyself().getId() + ".json");
+  }
+  
+  @SuppressWarnings("unchecked")
+  ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> readPerNodeFromDisk(){
+    if (!parent.getSettings().isPersistDataState()){
+      return new ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>>();
+    }
+    try (FileInputStream fos = new FileInputStream(computePerNodeTarget())){
+      return MAPPER.readValue(fos, ConcurrentHashMap.class);
+    } catch (IOException e) {
+      LOGGER.debug(e);
+    }
+    return new ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>>();
+  }
+  
+  void writePerNodeToDisk(){
+    if (!parent.getSettings().isPersistDataState()){
+      return;
+    }
+    try (FileOutputStream fos = new FileOutputStream(computePerNodeTarget())){
+      MAPPER.writeValue(fos, gossipCore.getPerNodeData());
+    } catch (IOException e) {
+      LOGGER.warn(e);
+    }
+  }
+  
+  void writeSharedToDisk(){
+    if (!parent.getSettings().isPersistDataState()){
+      return;
+    }
+    try (FileOutputStream fos = new FileOutputStream(computeSharedTarget())){
+      MAPPER.writeValue(fos, gossipCore.getSharedData());
+    } catch (IOException e) {
+      LOGGER.warn(e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  ConcurrentHashMap<String, SharedGossipDataMessage> readSharedDataFromDisk(){
+    if (!parent.getSettings().isPersistRingState()){
+      return new ConcurrentHashMap<String, SharedGossipDataMessage>();
+    }
+    try (FileInputStream fos = new FileInputStream(computeSharedTarget())){
+      return MAPPER.readValue(fos, ConcurrentHashMap.class);
+    } catch (IOException e) {
+      LOGGER.debug(e);
+    }
+    return new ConcurrentHashMap<String, SharedGossipDataMessage>();
+  }
+  
+  /**
+   * Writes all pernode and shared data to disk 
+   */
+  @Override
+  public void run() {
+    writePerNodeToDisk();
+    writeSharedToDisk();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/32c082a0/src/test/java/org/apache/gossip/DataTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java
index 83879f9..98c7ee0 100644
--- a/src/test/java/org/apache/gossip/DataTest.java
+++ b/src/test/java/org/apache/gossip/DataTest.java
@@ -39,6 +39,8 @@ public class DataTest {
   @Test
   public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{
     GossipSettings settings = new GossipSettings();
+    settings.setPersistRingState(false);
+    settings.setPersistDataState(false);
     String cluster = UUID.randomUUID().toString();
     int seedNodes = 1;
     List<GossipMember> startupMembers = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/32c082a0/src/test/java/org/apache/gossip/IdAndPropertyTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/IdAndPropertyTest.java b/src/test/java/org/apache/gossip/IdAndPropertyTest.java
index 2a98f01..1eb0aee 100644
--- a/src/test/java/org/apache/gossip/IdAndPropertyTest.java
+++ b/src/test/java/org/apache/gossip/IdAndPropertyTest.java
@@ -25,7 +25,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeUnit; 
 
 import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper;
 import org.junit.jupiter.api.Test;
@@ -75,8 +75,7 @@ public class IdAndPropertyTest {
         value = gossipService2.getGossipManager().getLiveMembers().get(0).getProperties().get("a");
       } catch (RuntimeException e){ }
       return value;
-    }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("b");
-    
+    }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("b");    
     gossipService1.shutdown();
     gossipService2.shutdown();
     

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/32c082a0/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
index 9d02556..2386084 100644
--- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
+++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java
@@ -47,7 +47,9 @@ public class ShutdownDeadtimeTest {
   @Test
   public void DeadNodesDoNotComeAliveAgain()
           throws InterruptedException, UnknownHostException, URISyntaxException {
-    GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 5.0, "exponential");
+    GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 2.0, "exponential");
+    settings.setPersistRingState(false);
+    settings.setPersistDataState(false);
     String cluster = UUID.randomUUID().toString();
     int seedNodes = 3;
     List<GossipMember> startupMembers = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/32c082a0/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
index bc4004d..aa797f5 100644
--- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
+++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java
@@ -48,6 +48,8 @@ public class TenNodeThreeSeedTest {
 
   public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException{
     GossipSettings settings = new GossipSettings();
+    settings.setPersistRingState(false);
+    settings.setPersistDataState(false);
     String cluster = UUID.randomUUID().toString();
     int seedNodes = 3;
     List<GossipMember> startupMembers = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/32c082a0/src/test/java/org/apache/gossip/manager/DataReaperTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/manager/DataReaperTest.java b/src/test/java/org/apache/gossip/manager/DataReaperTest.java
index b4ac45d..a9c861c 100644
--- a/src/test/java/org/apache/gossip/manager/DataReaperTest.java
+++ b/src/test/java/org/apache/gossip/manager/DataReaperTest.java
@@ -39,6 +39,8 @@ public class DataReaperTest {
   @Test
   public void testReaperOneShot() {
     GossipSettings settings = new GossipSettings();
+    settings.setPersistRingState(false);
+    settings.setPersistDataState(false);
     GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings)
             .withId(myId).uri(URI.create("udp://localhost:6000")).registry(registry).build();
     gm.init();

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/32c082a0/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java b/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java
new file mode 100644
index 0000000..6e41bdc
--- /dev/null
+++ b/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.HashMap;
+import org.apache.gossip.GossipService;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.RemoteGossipMember;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.codahale.metrics.MetricRegistry;
+
+public class RingPersistenceTest {
+
+  @Test
+  public void givenThatRingIsPersisted() throws UnknownHostException, InterruptedException,
URISyntaxException {
+    GossipSettings settings = new GossipSettings();
+    File f = aGossiperPersists(settings);
+    Assert.assertTrue(f.exists());
+    aNewInstanceGetsRingInfo(settings);
+    f.delete();
+  }
+  
+  private File aGossiperPersists(GossipSettings settings) throws UnknownHostException, InterruptedException,
URISyntaxException {
+    GossipService gossipService = new GossipService("a", new URI("udp://" + "127.0.0.1" +
":" + (29000 + 1)), "1", new HashMap<String, String>(),
+            Arrays.asList(
+                    new RemoteGossipMember("a", new URI("udp://" + "127.0.0.1" + ":" + (29000
+ 0)), "0"),
+                    new RemoteGossipMember("a", new URI("udp://" + "127.0.0.1" + ":" + (29000
+ 2)), "2")
+                    ),
+            settings, (a, b) -> { }, new MetricRegistry());
+    gossipService.getGossipManager().getRingState().writeToDisk();
+    return gossipService.getGossipManager().getRingState().computeTarget();
+  }
+  
+  private void aNewInstanceGetsRingInfo(GossipSettings settings) throws UnknownHostException,
InterruptedException, URISyntaxException{
+    GossipService gossipService2 = new GossipService("a", new URI("udp://" + "127.0.0.1"
+ ":" + (29000 + 1)), "1", new HashMap<String, String>(),
+            Arrays.asList(),
+            settings, (a, b) -> { }, new MetricRegistry());
+    Assert.assertEquals(2, gossipService2.getGossipManager().getMembers().size());
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/32c082a0/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java b/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java
new file mode 100644
index 0000000..e0cbcf4
--- /dev/null
+++ b/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.manager;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.gossip.GossipService;
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.SharedGossipDataMessage;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.codahale.metrics.MetricRegistry;
+
+public class UserDataPersistenceTest {
+
+  @Test
+  public void givenThatRingIsPersisted() throws UnknownHostException, InterruptedException,
URISyntaxException {
+    String nodeId = "1";
+    GossipSettings settings = new GossipSettings();
+    { //Create a gossip service and force it to persist its user data
+      GossipService gossipService = new GossipService("a",
+              new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), nodeId, new HashMap<String,
String>(),
+              Arrays.asList(), settings, (a, b) -> { }, new MetricRegistry());
+      gossipService.start();
+      gossipService.gossipPerNodeData(getToothpick());
+      gossipService.gossipSharedData(getAnotherToothpick());
+      gossipService.getGossipManager().getUserDataState().writePerNodeToDisk();
+      gossipService.getGossipManager().getUserDataState().writeSharedToDisk();
+      { //read the raw data and confirm
+        ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>>
l = gossipService.getGossipManager().getUserDataState().readPerNodeFromDisk();
+        Assert.assertEquals("red", ((AToothpick) l.get(nodeId).get("a").getPayload()).getColor());
+      }
+      {
+        ConcurrentHashMap<String, SharedGossipDataMessage> l = 
+                gossipService.getGossipManager().getUserDataState().readSharedDataFromDisk();
+        Assert.assertEquals("blue", ((AToothpick) l.get("a").getPayload()).getColor());
+      }
+      gossipService.shutdown();
+    }
+    { //recreate the service and see that the data is read back in
+      GossipService gossipService = new GossipService("a",
+              new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), nodeId, new HashMap<String,
String>(),
+              Arrays.asList(), settings, (a, b) -> { }, new MetricRegistry());
+      gossipService.start();
+      Assert.assertEquals("red", ((AToothpick) gossipService.findPerNodeData(nodeId, "a").getPayload()).getColor());
+      Assert.assertEquals("blue", ((AToothpick) gossipService.findSharedData("a").getPayload()).getColor());
+      File f = gossipService.getGossipManager().getUserDataState().computeSharedTarget();
+      File g = gossipService.getGossipManager().getUserDataState().computePerNodeTarget();
+      gossipService.shutdown();
+      f.delete();
+      g.delete();
+    }
+  }
+  
+  public GossipDataMessage getToothpick(){
+    AToothpick a = new AToothpick();
+    a.setColor("red");
+    GossipDataMessage d = new GossipDataMessage();
+    d.setExpireAt(Long.MAX_VALUE);
+    d.setKey("a");
+    d.setPayload(a);
+    d.setTimestamp(System.currentTimeMillis());
+    return d;
+  }
+  
+  public SharedGossipDataMessage getAnotherToothpick(){
+    AToothpick a = new AToothpick();
+    a.setColor("blue");
+    SharedGossipDataMessage d = new SharedGossipDataMessage();
+    d.setExpireAt(Long.MAX_VALUE);
+    d.setKey("a");
+    d.setPayload(a);
+    d.setTimestamp(System.currentTimeMillis());
+    return d;
+  }
+  
+  public static class AToothpick {
+    private String color;
+    public AToothpick(){
+      
+    }
+    public String getColor() {
+      return color;
+    }
+    public void setColor(String color) {
+      this.color = color;
+    }
+    
+  }
+}



Mime
View raw message