gossip-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecapri...@apache.org
Subject incubator-gossip git commit: GOSSIP-26 Gossip shared data
Date Fri, 07 Oct 2016 07:08:08 GMT
Repository: incubator-gossip
Updated Branches:
  refs/heads/master f35dddd8f -> 201b101a9


GOSSIP-26 Gossip shared data


Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/201b101a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/201b101a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/201b101a

Branch: refs/heads/master
Commit: 201b101a91cf02d4ef2b0d9536cf0ceda99f6115
Parents: f35dddd
Author: Edward Capriolo <edlinuxguru@gmail.com>
Authored: Fri Oct 7 03:04:59 2016 -0400
Committer: Edward Capriolo <edlinuxguru@gmail.com>
Committed: Fri Oct 7 03:04:59 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/gossip/GossipService.java   | 21 ++++++++-
 .../gossip/manager/ActiveGossipThread.java      | 42 +++++++++++++++--
 .../org/apache/gossip/manager/DataReaper.java   | 14 +++++-
 .../org/apache/gossip/manager/GossipCore.java   | 25 ++++++++++-
 .../apache/gossip/manager/GossipManager.java    | 23 +++++++++-
 src/main/java/org/apache/gossip/model/Base.java |  5 ++-
 .../gossip/model/SharedGossipDataMessage.java   | 47 ++++++++++++++++++++
 .../gossip/udp/UdpSharedGossipDataMessage.java  | 31 +++++++++++++
 src/test/java/org/apache/gossip/DataTest.java   | 47 +++++++++++++++-----
 .../apache/gossip/manager/DataReaperTest.java   | 25 ++++++++---
 10 files changed, 252 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/GossipService.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java
index ab0da97..e50f260 100644
--- a/src/main/java/org/apache/gossip/GossipService.java
+++ b/src/main/java/org/apache/gossip/GossipService.java
@@ -24,7 +24,8 @@ import java.util.List;
 import org.apache.gossip.event.GossipListener;
 import org.apache.gossip.manager.GossipManager;
 import org.apache.gossip.manager.random.RandomGossipManager;
-import org.apache.gossip.model.GossipDataMessage; 
+import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.SharedGossipDataMessage;
 import org.apache.log4j.Logger;
 
 /**
@@ -97,7 +98,23 @@ public class GossipService {
    * @return return the value if found or null if not found or expired
    */
   public GossipDataMessage findPerNodeData(String nodeId, String key){ 
-    return getGossipManager().findGossipData(nodeId, key);
+    return getGossipManager().findPerNodeGossipData(nodeId, key);
   }
 
+  /**
+   * Gossip shared data
+   * @param message
+   */
+  public void gossipSharedData(SharedGossipDataMessage message){
+    gossipManager.gossipSharedData(message);
+  }
+  
+  /**
+   * 
+   * @param key the key to search for
+   * @return
+   */
+  public SharedGossipDataMessage findSharedData(String key){
+    return getGossipManager().findSharedGossipData(key);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
index 19caffe..28de244 100644
--- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
+++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java
@@ -34,9 +34,10 @@ import org.apache.gossip.model.ActiveGossipOk;
 import org.apache.gossip.model.GossipDataMessage;
 import org.apache.gossip.model.GossipMember;
 import org.apache.gossip.model.Response;
+import org.apache.gossip.model.SharedGossipDataMessage;
 import org.apache.gossip.udp.UdpActiveGossipMessage;
 import org.apache.gossip.udp.UdpGossipDataMessage;
-
+import org.apache.gossip.udp.UdpSharedGossipDataMessage;
 import org.apache.log4j.Logger;
 import org.codehaus.jackson.map.ObjectMapper;
 
@@ -70,7 +71,10 @@ public class ActiveGossipThread {
             () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()),
0,
             gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
     scheduledExecutorService.scheduleAtFixedRate(
-            () -> sendData(gossipManager.getMyself(), gossipManager.getLiveMembers()),
0,
+            () -> sendPerNodeData(gossipManager.getMyself(), gossipManager.getLiveMembers()),
0,
+            gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
+    scheduledExecutorService.scheduleAtFixedRate(
+            () -> sendSharedData(gossipManager.getMyself(), gossipManager.getLiveMembers()),
0,
             gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
   }
   
@@ -83,7 +87,39 @@ public class ActiveGossipThread {
     }
   }
 
-  public void sendData(LocalGossipMember me, List<LocalGossipMember> memberList){
+  public void sendSharedData(LocalGossipMember me, List<LocalGossipMember> memberList){
+    LocalGossipMember member = selectPartner(memberList);
+    if (member == null) {
+      LOGGER.debug("Send sendMembershipList() is called without action");
+      return;
+    }
+    try (DatagramSocket socket = new DatagramSocket()) {
+      socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
+      for (Entry<String, SharedGossipDataMessage> innerEntry : this.gossipCore.getSharedData().entrySet()){
+          UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage();
+          message.setUuid(UUID.randomUUID().toString());
+          message.setUriFrom(me.getId());
+          message.setExpireAt(innerEntry.getValue().getExpireAt());
+          message.setKey(innerEntry.getValue().getKey());
+          message.setNodeId(innerEntry.getValue().getNodeId());
+          message.setTimestamp(innerEntry.getValue().getTimestamp());
+          message.setPayload(innerEntry.getValue().getPayload());
+          message.setTimestamp(innerEntry.getValue().getTimestamp());
+          byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
+          int packet_length = json_bytes.length;
+          if (packet_length < GossipManager.MAX_PACKET_SIZE) {
+            gossipCore.sendOneWay(message, member.getUri());
+          } else {
+            LOGGER.error("The length of the to be send message is too large ("
+                    + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
+          }
+      }
+    } catch (IOException e1) {
+      LOGGER.warn(e1);
+    }
+  }
+  
+  public void sendPerNodeData(LocalGossipMember me, List<LocalGossipMember> memberList){
     LocalGossipMember member = selectPartner(memberList);
     if (member == null) {
       LOGGER.debug("Send sendMembershipList() is called without action");

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/manager/DataReaper.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/DataReaper.java b/src/main/java/org/apache/gossip/manager/DataReaper.java
index 237ffb6..4f4616b 100644
--- a/src/main/java/org/apache/gossip/manager/DataReaper.java
+++ b/src/main/java/org/apache/gossip/manager/DataReaper.java
@@ -7,6 +7,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.SharedGossipDataMessage;
 
 /**
  * We wish to periodically sweep user data and remove entries past their timestamp. This
@@ -28,12 +29,21 @@ public class DataReaper {
   
   public void init(){
     Runnable reapPerNodeData = () -> {
-      runOnce();
+      runPerNodeOnce();
+      runSharedOnce();
     };
     scheduledExecutor.scheduleAtFixedRate(reapPerNodeData, 0, 5, TimeUnit.SECONDS);
   }
   
-  void runOnce(){
+  void runSharedOnce(){
+    for (Entry<String, SharedGossipDataMessage> entry : gossipCore.getSharedData().entrySet()){
+      if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){
+        gossipCore.getSharedData().remove(entry.getKey(), entry.getValue());
+      }
+    }
+  }
+  
+  void runPerNodeOnce(){
     for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> node : gossipCore.getPerNodeData().entrySet()){
       reapData(node.getValue());
     }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/manager/GossipCore.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java
index 08ec5b4..6dc4a5c 100644
--- a/src/main/java/org/apache/gossip/manager/GossipCore.java
+++ b/src/main/java/org/apache/gossip/manager/GossipCore.java
@@ -23,11 +23,13 @@ import org.apache.gossip.model.ActiveGossipMessage;
 import org.apache.gossip.model.Base;
 import org.apache.gossip.model.GossipDataMessage;
 import org.apache.gossip.model.Response;
+import org.apache.gossip.model.SharedGossipDataMessage;
 import org.apache.gossip.udp.Trackable;
 import org.apache.gossip.udp.UdpActiveGossipMessage;
 import org.apache.gossip.udp.UdpActiveGossipOk;
 import org.apache.gossip.udp.UdpGossipDataMessage;
 import org.apache.gossip.udp.UdpNotAMemberFault;
+import org.apache.gossip.udp.UdpSharedGossipDataMessage;
 import org.apache.log4j.Logger;
 import org.codehaus.jackson.map.ObjectMapper;
 
@@ -39,24 +41,35 @@ public class GossipCore {
   private ConcurrentHashMap<String, Base> requests;
   private ExecutorService service;
   private final ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>>
perNodeData;
+  private final ConcurrentHashMap<String, SharedGossipDataMessage> sharedData;
   
   public GossipCore(GossipManager manager){
     this.gossipManager = manager;
     requests = new ConcurrentHashMap<>();
     service = Executors.newFixedThreadPool(500);
     perNodeData = new ConcurrentHashMap<>();
+    sharedData = new ConcurrentHashMap<>();
   }
   
+  public void addSharedData(SharedGossipDataMessage message){
+     SharedGossipDataMessage previous = sharedData.get(message.getKey());
+     if (previous == null){
+       sharedData.putIfAbsent(message.getKey(), message);
+     } else {
+       if (previous.getTimestamp() < message.getTimestamp()){
+         sharedData.replace(message.getKey(), previous, message);
+       }
+     }
+  }
 
   public void addPerNodeData(GossipDataMessage message){
     ConcurrentHashMap<String,GossipDataMessage> nodeMap = new ConcurrentHashMap<>();
     nodeMap.put(message.getKey(), message);
     nodeMap = perNodeData.putIfAbsent(message.getNodeId(), nodeMap);
     if (nodeMap != null){
-      //m.put(message.getKey(), message);    //TODO only put if > ts
       GossipDataMessage current = nodeMap.get(message.getKey());
       if (current == null){
-        nodeMap.replace(message.getKey(), null, message);
+        nodeMap.putIfAbsent(message.getKey(), message);
       } else {
         if (current.getTimestamp() < message.getTimestamp()){
           nodeMap.replace(message.getKey(), current, message);
@@ -69,6 +82,10 @@ public class GossipCore {
     return perNodeData;
   }
   
+  public ConcurrentHashMap<String, SharedGossipDataMessage> getSharedData() {
+    return sharedData;
+  }
+
   public void shutdown(){
     service.shutdown();
     try {
@@ -89,6 +106,10 @@ public class GossipCore {
       UdpGossipDataMessage message = (UdpGossipDataMessage) base;
       addPerNodeData(message);
     }
+    if (base instanceof SharedGossipDataMessage){
+      UdpSharedGossipDataMessage message = (UdpSharedGossipDataMessage) base;
+      addSharedData(message);
+    }
     if (base instanceof ActiveGossipMessage){
       List<GossipMember> remoteGossipMembers = new ArrayList<>();
       RemoteGossipMember senderMember = null;

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/manager/GossipManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java
index 3c66208..9f75fe3 100644
--- a/src/main/java/org/apache/gossip/manager/GossipManager.java
+++ b/src/main/java/org/apache/gossip/manager/GossipManager.java
@@ -44,6 +44,7 @@ import org.apache.gossip.event.GossipState;
 import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
 
 import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.SharedGossipDataMessage;
 
 
 public abstract class GossipManager implements NotificationListener {
@@ -235,7 +236,15 @@ public abstract class GossipManager implements NotificationListener {
     gossipCore.addPerNodeData(message);
   }
   
-  public GossipDataMessage findGossipData(String nodeId, String key){
+  public void gossipSharedData(SharedGossipDataMessage message){
+    Objects.nonNull(message.getKey());
+    Objects.nonNull(message.getTimestamp());
+    Objects.nonNull(message.getPayload());
+    message.setNodeId(me.getId());
+    gossipCore.addSharedData(message);
+  }
+  
+  public GossipDataMessage findPerNodeGossipData(String nodeId, String key){
     ConcurrentHashMap<String, GossipDataMessage> j = gossipCore.getPerNodeData().get(nodeId);
     if (j == null){
       return null;
@@ -250,6 +259,18 @@ public abstract class GossipManager implements NotificationListener {
       return l;
     }
   }
+  
+  public SharedGossipDataMessage findSharedGossipData(String key){
+    SharedGossipDataMessage l = gossipCore.getSharedData().get(key);
+    if (l == null){
+      return null;
+    }
+    if (l.getExpireAt() < clock.currentTimeMillis()){
+      return null;
+    } else {
+      return l;
+    }
+  }
 
   public DataReaper getDataReaper() {
     return dataReaper;

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/model/Base.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/model/Base.java b/src/main/java/org/apache/gossip/model/Base.java
index 66c2be6..2bbb7af 100644
--- a/src/main/java/org/apache/gossip/model/Base.java
+++ b/src/main/java/org/apache/gossip/model/Base.java
@@ -4,6 +4,7 @@ import org.apache.gossip.udp.UdpActiveGossipMessage;
 import org.apache.gossip.udp.UdpActiveGossipOk;
 import org.apache.gossip.udp.UdpGossipDataMessage;
 import org.apache.gossip.udp.UdpNotAMemberFault;
+import org.apache.gossip.udp.UdpSharedGossipDataMessage;
 import org.codehaus.jackson.annotate.JsonSubTypes;
 import org.codehaus.jackson.annotate.JsonSubTypes.Type;
 import org.codehaus.jackson.annotate.JsonTypeInfo;
@@ -20,7 +21,9 @@ import org.codehaus.jackson.annotate.JsonTypeInfo;
         @Type(value = UdpActiveGossipMessage.class, name = "UdpActiveGossipMessage"),
         @Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault"),
         @Type(value = GossipDataMessage.class, name = "GossipDataMessage"),
-        @Type(value = UdpGossipDataMessage.class, name = "UdpGossipDataMessage")
+        @Type(value = UdpGossipDataMessage.class, name = "UdpGossipDataMessage"),
+        @Type(value = SharedGossipDataMessage.class, name = "SharedGossipDataMessage"),
+        @Type(value = UdpSharedGossipDataMessage.class, name = "UdpSharedGossipDataMessage")
         })
 public class Base {
 

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java b/src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java
new file mode 100644
index 0000000..bac9ddf
--- /dev/null
+++ b/src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java
@@ -0,0 +1,47 @@
+package org.apache.gossip.model;
+
+public class SharedGossipDataMessage extends Base {
+
+  private String nodeId;
+  private String key;
+  private Object payload;
+  private Long timestamp;
+  private Long expireAt;
+  
+  public String getNodeId() {
+    return nodeId;
+  }
+  public void setNodeId(String nodeId) {
+    this.nodeId = nodeId;
+  }
+  public String getKey() {
+    return key;
+  }
+  public void setKey(String key) {
+    this.key = key;
+  }
+  public Object getPayload() {
+    return payload;
+  }
+  public void setPayload(Object payload) {
+    this.payload = payload;
+  }
+  public Long getTimestamp() {
+    return timestamp;
+  }
+  public void setTimestamp(Long timestamp) {
+    this.timestamp = timestamp;
+  }
+  public Long getExpireAt() {
+    return expireAt;
+  }
+  public void setExpireAt(Long expireAt) {
+    this.expireAt = expireAt;
+  }
+  @Override
+  public String toString() {
+    return "SharedGossipDataMessage [nodeId=" + nodeId + ", key=" + key + ", payload=" +
payload
+            + ", timestamp=" + timestamp + ", expireAt=" + expireAt + "]";
+  }  
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java b/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java
new file mode 100644
index 0000000..cb99759
--- /dev/null
+++ b/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java
@@ -0,0 +1,31 @@
+package org.apache.gossip.udp;
+
+import org.apache.gossip.model.SharedGossipDataMessage;
+
+public class UdpSharedGossipDataMessage extends SharedGossipDataMessage implements Trackable
{
+
+  private String uriFrom;
+  private String uuid;
+  
+  public String getUriFrom() {
+    return uriFrom;
+  }
+  
+  public void setUriFrom(String uriFrom) {
+    this.uriFrom = uriFrom;
+  }
+  
+  public String getUuid() {
+    return uuid;
+  }
+  
+  public void setUuid(String uuid) {
+    this.uuid = uuid;
+  }
+
+  @Override
+  public String toString() {
+    return "UdpSharedGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/test/java/org/apache/gossip/DataTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java
index 02c89a8..4909bf8 100644
--- a/src/test/java/org/apache/gossip/DataTest.java
+++ b/src/test/java/org/apache/gossip/DataTest.java
@@ -12,6 +12,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.gossip.event.GossipListener;
 import org.apache.gossip.event.GossipState;
 import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.SharedGossipDataMessage;
 import org.junit.Test;
 
 import io.teknek.tunit.TUnit;
@@ -19,7 +20,7 @@ import io.teknek.tunit.TUnit;
 public class DataTest {
   
   @Test
-  public void abc() throws InterruptedException, UnknownHostException, URISyntaxException{
+  public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{
     GossipSettings settings = new GossipSettings();
     String cluster = UUID.randomUUID().toString();
     int seedNodes = 1;
@@ -51,20 +52,32 @@ public class DataTest {
         return total;
       }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
     clients.get(0).gossipPerNodeData(msg());
+    clients.get(0).gossipSharedData(sharedMsg());
     Thread.sleep(10000);
     TUnit.assertThat(
-            
-            new Callable<Object> (){
+            new Callable<Object>() {
               public Object call() throws Exception {
-                GossipDataMessage x = clients.get(1).findPerNodeData(1+"" , "a");
-                if (x == null) return "";
-                else return x.getPayload();
-              }})
-            
-            
-            //() -> clients.get(1).findGossipData(1+"" , "a").getPayload())
-    .afterWaitingAtMost(20, TimeUnit.SECONDS)
-    .isEqualTo("b");
+                GossipDataMessage x = clients.get(1).findPerNodeData(1 + "", "a");
+                if (x == null)
+                  return "";
+                else
+                  return x.getPayload();
+              }
+            }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b");
+    
+    
+    TUnit.assertThat(
+            new Callable<Object>() {
+              public Object call() throws Exception {
+                SharedGossipDataMessage x = clients.get(1).findSharedData("a");
+                if (x == null)
+                  return "";
+                else
+                  return x.getPayload();
+              }
+            }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c");
+    
+    
     for (int i = 0; i < clusterMembers; ++i) {
       clients.get(i).shutdown();
     }
@@ -78,4 +91,14 @@ public class DataTest {
     g.setTimestamp(System.currentTimeMillis());
     return g;
   }
+  
+  private SharedGossipDataMessage sharedMsg(){
+    SharedGossipDataMessage g = new SharedGossipDataMessage();
+    g.setExpireAt(Long.MAX_VALUE);
+    g.setKey("a");
+    g.setPayload("c");
+    g.setTimestamp(System.currentTimeMillis());
+    return g;
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/test/java/org/apache/gossip/manager/DataReaperTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/gossip/manager/DataReaperTest.java b/src/test/java/org/apache/gossip/manager/DataReaperTest.java
index 4cd5dfe..d0164b1 100644
--- a/src/test/java/org/apache/gossip/manager/DataReaperTest.java
+++ b/src/test/java/org/apache/gossip/manager/DataReaperTest.java
@@ -5,6 +5,7 @@ import java.net.URI;
 import org.apache.gossip.GossipSettings;
 import org.apache.gossip.manager.random.RandomGossipManager;
 import org.apache.gossip.model.GossipDataMessage;
+import org.apache.gossip.model.SharedGossipDataMessage;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -21,9 +22,13 @@ public class DataReaperTest {
     GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings)
             .withId(myId).uri(URI.create("udp://localhost:5000")).build();
     gm.gossipPerNodeData(perNodeDatum(key, value));
-    Assert.assertEquals(value, gm.findGossipData(myId, key).getPayload());
-    gm.getDataReaper().runOnce();
-    TUnit.assertThat(() -> gm.findGossipData(myId, key)).equals(null);
+    gm.gossipSharedData(sharedDatum(key, value));
+    Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload());
+    Assert.assertEquals(value, gm.findSharedGossipData(key).getPayload());
+    gm.getDataReaper().runPerNodeOnce();
+    gm.getDataReaper().runSharedOnce();
+    TUnit.assertThat(() -> gm.findPerNodeGossipData(myId, key)).equals(null);
+    TUnit.assertThat(() -> gm.findSharedGossipData(key)).equals(null);
   }
 
   private GossipDataMessage perNodeDatum(String key, String value) {
@@ -34,6 +39,16 @@ public class DataReaperTest {
     m.setTimestamp(System.currentTimeMillis());
     return m;
   }
+  
+  private SharedGossipDataMessage sharedDatum(String key, String value) {
+    SharedGossipDataMessage m = new SharedGossipDataMessage();
+    m.setExpireAt(System.currentTimeMillis() + 5L);
+    m.setKey(key);
+    m.setPayload(value);
+    m.setTimestamp(System.currentTimeMillis());
+    return m;
+  }
+  
 
   @Test
   public void testHigherTimestampWins() {
@@ -47,9 +62,9 @@ public class DataReaperTest {
     GossipDataMessage after = perNodeDatum(key, "b");
     after.setTimestamp(after.getTimestamp() - 1);
     gm.gossipPerNodeData(before);
-    Assert.assertEquals(value, gm.findGossipData(myId, key).getPayload());
+    Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload());
     gm.gossipPerNodeData(after);
-    Assert.assertEquals(value, gm.findGossipData(myId, key).getPayload());
+    Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload());
   }
 
 }


Mime
View raw message