Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C6246200B9A for ; Fri, 7 Oct 2016 09:08:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C4245160AE8; Fri, 7 Oct 2016 07:08:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 68925160AD6 for ; Fri, 7 Oct 2016 09:08:16 +0200 (CEST) Received: (qmail 55791 invoked by uid 500); 7 Oct 2016 07:08:15 -0000 Mailing-List: contact commits-help@gossip.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@gossip.incubator.apache.org Delivered-To: mailing list commits@gossip.incubator.apache.org Received: (qmail 55782 invoked by uid 99); 7 Oct 2016 07:08:15 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Oct 2016 07:08:15 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 2F9DB18062B for ; Fri, 7 Oct 2016 07:08:15 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id co2XzLfEF7_L for ; Fri, 7 Oct 2016 07:08:10 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 702975F56F for ; Fri, 7 Oct 2016 07:08:09 +0000 (UTC) Received: (qmail 55724 invoked by uid 99); 7 Oct 2016 07:08:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Oct 2016 07:08:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7BE99DFDC4; Fri, 7 Oct 2016 07:08:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ecapriolo@apache.org To: commits@gossip.incubator.apache.org Message-Id: <8de90da9c9d14075ae000dd031727846@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-gossip git commit: GOSSIP-26 Gossip shared data Date: Fri, 7 Oct 2016 07:08:08 +0000 (UTC) archived-at: Fri, 07 Oct 2016 07:08:18 -0000 Repository: incubator-gossip Updated Branches: refs/heads/master f35dddd8f -> 201b101a9 GOSSIP-26 Gossip shared data Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/201b101a Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/201b101a Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/201b101a Branch: refs/heads/master Commit: 201b101a91cf02d4ef2b0d9536cf0ceda99f6115 Parents: f35dddd Author: Edward Capriolo Authored: Fri Oct 7 03:04:59 2016 -0400 Committer: Edward Capriolo Committed: Fri Oct 7 03:04:59 2016 -0400 ---------------------------------------------------------------------- .../java/org/apache/gossip/GossipService.java | 21 ++++++++- .../gossip/manager/ActiveGossipThread.java | 42 +++++++++++++++-- .../org/apache/gossip/manager/DataReaper.java | 14 +++++- .../org/apache/gossip/manager/GossipCore.java | 25 ++++++++++- .../apache/gossip/manager/GossipManager.java | 23 +++++++++- src/main/java/org/apache/gossip/model/Base.java | 5 ++- .../gossip/model/SharedGossipDataMessage.java | 47 ++++++++++++++++++++ .../gossip/udp/UdpSharedGossipDataMessage.java | 31 +++++++++++++ src/test/java/org/apache/gossip/DataTest.java | 47 +++++++++++++++----- .../apache/gossip/manager/DataReaperTest.java | 25 ++++++++--- 10 files changed, 252 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/GossipService.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java index ab0da97..e50f260 100644 --- a/src/main/java/org/apache/gossip/GossipService.java +++ b/src/main/java/org/apache/gossip/GossipService.java @@ -24,7 +24,8 @@ import java.util.List; import org.apache.gossip.event.GossipListener; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.random.RandomGossipManager; -import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.SharedGossipDataMessage; import org.apache.log4j.Logger; /** @@ -97,7 +98,23 @@ public class GossipService { * @return return the value if found or null if not found or expired */ public GossipDataMessage findPerNodeData(String nodeId, String key){ - return getGossipManager().findGossipData(nodeId, key); + return getGossipManager().findPerNodeGossipData(nodeId, key); } + /** + * Gossip shared data + * @param message + */ + public void gossipSharedData(SharedGossipDataMessage message){ + gossipManager.gossipSharedData(message); + } + + /** + * + * @param key the key to search for + * @return + */ + public SharedGossipDataMessage findSharedData(String key){ + return getGossipManager().findSharedGossipData(key); + } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java index 19caffe..28de244 100644 --- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java @@ -34,9 +34,10 @@ import org.apache.gossip.model.ActiveGossipOk; import org.apache.gossip.model.GossipDataMessage; import org.apache.gossip.model.GossipMember; import org.apache.gossip.model.Response; +import org.apache.gossip.model.SharedGossipDataMessage; import org.apache.gossip.udp.UdpActiveGossipMessage; import org.apache.gossip.udp.UdpGossipDataMessage; - +import org.apache.gossip.udp.UdpSharedGossipDataMessage; import org.apache.log4j.Logger; import org.codehaus.jackson.map.ObjectMapper; @@ -70,7 +71,10 @@ public class ActiveGossipThread { () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()), 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); scheduledExecutorService.scheduleAtFixedRate( - () -> sendData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0, + () -> sendPerNodeData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0, + gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); + scheduledExecutorService.scheduleAtFixedRate( + () -> sendSharedData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); } @@ -83,7 +87,39 @@ public class ActiveGossipThread { } } - public void sendData(LocalGossipMember me, List memberList){ + public void sendSharedData(LocalGossipMember me, List memberList){ + LocalGossipMember member = selectPartner(memberList); + if (member == null) { + LOGGER.debug("Send sendMembershipList() is called without action"); + return; + } + try (DatagramSocket socket = new DatagramSocket()) { + socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); + for (Entry innerEntry : this.gossipCore.getSharedData().entrySet()){ + UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage(); + message.setUuid(UUID.randomUUID().toString()); + message.setUriFrom(me.getId()); + message.setExpireAt(innerEntry.getValue().getExpireAt()); + message.setKey(innerEntry.getValue().getKey()); + message.setNodeId(innerEntry.getValue().getNodeId()); + message.setTimestamp(innerEntry.getValue().getTimestamp()); + message.setPayload(innerEntry.getValue().getPayload()); + message.setTimestamp(innerEntry.getValue().getTimestamp()); + byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes(); + int packet_length = json_bytes.length; + if (packet_length < GossipManager.MAX_PACKET_SIZE) { + gossipCore.sendOneWay(message, member.getUri()); + } else { + LOGGER.error("The length of the to be send message is too large (" + + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); + } + } + } catch (IOException e1) { + LOGGER.warn(e1); + } + } + + public void sendPerNodeData(LocalGossipMember me, List memberList){ LocalGossipMember member = selectPartner(memberList); if (member == null) { LOGGER.debug("Send sendMembershipList() is called without action"); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/manager/DataReaper.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/DataReaper.java b/src/main/java/org/apache/gossip/manager/DataReaper.java index 237ffb6..4f4616b 100644 --- a/src/main/java/org/apache/gossip/manager/DataReaper.java +++ b/src/main/java/org/apache/gossip/manager/DataReaper.java @@ -7,6 +7,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.SharedGossipDataMessage; /** * We wish to periodically sweep user data and remove entries past their timestamp. This @@ -28,12 +29,21 @@ public class DataReaper { public void init(){ Runnable reapPerNodeData = () -> { - runOnce(); + runPerNodeOnce(); + runSharedOnce(); }; scheduledExecutor.scheduleAtFixedRate(reapPerNodeData, 0, 5, TimeUnit.SECONDS); } - void runOnce(){ + void runSharedOnce(){ + for (Entry entry : gossipCore.getSharedData().entrySet()){ + if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){ + gossipCore.getSharedData().remove(entry.getKey(), entry.getValue()); + } + } + } + + void runPerNodeOnce(){ for (Entry> node : gossipCore.getPerNodeData().entrySet()){ reapData(node.getValue()); } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/manager/GossipCore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java index 08ec5b4..6dc4a5c 100644 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -23,11 +23,13 @@ import org.apache.gossip.model.ActiveGossipMessage; import org.apache.gossip.model.Base; import org.apache.gossip.model.GossipDataMessage; import org.apache.gossip.model.Response; +import org.apache.gossip.model.SharedGossipDataMessage; import org.apache.gossip.udp.Trackable; import org.apache.gossip.udp.UdpActiveGossipMessage; import org.apache.gossip.udp.UdpActiveGossipOk; import org.apache.gossip.udp.UdpGossipDataMessage; import org.apache.gossip.udp.UdpNotAMemberFault; +import org.apache.gossip.udp.UdpSharedGossipDataMessage; import org.apache.log4j.Logger; import org.codehaus.jackson.map.ObjectMapper; @@ -39,24 +41,35 @@ public class GossipCore { private ConcurrentHashMap requests; private ExecutorService service; private final ConcurrentHashMap> perNodeData; + private final ConcurrentHashMap sharedData; public GossipCore(GossipManager manager){ this.gossipManager = manager; requests = new ConcurrentHashMap<>(); service = Executors.newFixedThreadPool(500); perNodeData = new ConcurrentHashMap<>(); + sharedData = new ConcurrentHashMap<>(); } + public void addSharedData(SharedGossipDataMessage message){ + SharedGossipDataMessage previous = sharedData.get(message.getKey()); + if (previous == null){ + sharedData.putIfAbsent(message.getKey(), message); + } else { + if (previous.getTimestamp() < message.getTimestamp()){ + sharedData.replace(message.getKey(), previous, message); + } + } + } public void addPerNodeData(GossipDataMessage message){ ConcurrentHashMap nodeMap = new ConcurrentHashMap<>(); nodeMap.put(message.getKey(), message); nodeMap = perNodeData.putIfAbsent(message.getNodeId(), nodeMap); if (nodeMap != null){ - //m.put(message.getKey(), message); //TODO only put if > ts GossipDataMessage current = nodeMap.get(message.getKey()); if (current == null){ - nodeMap.replace(message.getKey(), null, message); + nodeMap.putIfAbsent(message.getKey(), message); } else { if (current.getTimestamp() < message.getTimestamp()){ nodeMap.replace(message.getKey(), current, message); @@ -69,6 +82,10 @@ public class GossipCore { return perNodeData; } + public ConcurrentHashMap getSharedData() { + return sharedData; + } + public void shutdown(){ service.shutdown(); try { @@ -89,6 +106,10 @@ public class GossipCore { UdpGossipDataMessage message = (UdpGossipDataMessage) base; addPerNodeData(message); } + if (base instanceof SharedGossipDataMessage){ + UdpSharedGossipDataMessage message = (UdpSharedGossipDataMessage) base; + addSharedData(message); + } if (base instanceof ActiveGossipMessage){ List remoteGossipMembers = new ArrayList<>(); RemoteGossipMember senderMember = null; http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/manager/GossipManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index 3c66208..9f75fe3 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -44,6 +44,7 @@ import org.apache.gossip.event.GossipState; import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.SharedGossipDataMessage; public abstract class GossipManager implements NotificationListener { @@ -235,7 +236,15 @@ public abstract class GossipManager implements NotificationListener { gossipCore.addPerNodeData(message); } - public GossipDataMessage findGossipData(String nodeId, String key){ + public void gossipSharedData(SharedGossipDataMessage message){ + Objects.nonNull(message.getKey()); + Objects.nonNull(message.getTimestamp()); + Objects.nonNull(message.getPayload()); + message.setNodeId(me.getId()); + gossipCore.addSharedData(message); + } + + public GossipDataMessage findPerNodeGossipData(String nodeId, String key){ ConcurrentHashMap j = gossipCore.getPerNodeData().get(nodeId); if (j == null){ return null; @@ -250,6 +259,18 @@ public abstract class GossipManager implements NotificationListener { return l; } } + + public SharedGossipDataMessage findSharedGossipData(String key){ + SharedGossipDataMessage l = gossipCore.getSharedData().get(key); + if (l == null){ + return null; + } + if (l.getExpireAt() < clock.currentTimeMillis()){ + return null; + } else { + return l; + } + } public DataReaper getDataReaper() { return dataReaper; http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/model/Base.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/model/Base.java b/src/main/java/org/apache/gossip/model/Base.java index 66c2be6..2bbb7af 100644 --- a/src/main/java/org/apache/gossip/model/Base.java +++ b/src/main/java/org/apache/gossip/model/Base.java @@ -4,6 +4,7 @@ import org.apache.gossip.udp.UdpActiveGossipMessage; import org.apache.gossip.udp.UdpActiveGossipOk; import org.apache.gossip.udp.UdpGossipDataMessage; import org.apache.gossip.udp.UdpNotAMemberFault; +import org.apache.gossip.udp.UdpSharedGossipDataMessage; import org.codehaus.jackson.annotate.JsonSubTypes; import org.codehaus.jackson.annotate.JsonSubTypes.Type; import org.codehaus.jackson.annotate.JsonTypeInfo; @@ -20,7 +21,9 @@ import org.codehaus.jackson.annotate.JsonTypeInfo; @Type(value = UdpActiveGossipMessage.class, name = "UdpActiveGossipMessage"), @Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault"), @Type(value = GossipDataMessage.class, name = "GossipDataMessage"), - @Type(value = UdpGossipDataMessage.class, name = "UdpGossipDataMessage") + @Type(value = UdpGossipDataMessage.class, name = "UdpGossipDataMessage"), + @Type(value = SharedGossipDataMessage.class, name = "SharedGossipDataMessage"), + @Type(value = UdpSharedGossipDataMessage.class, name = "UdpSharedGossipDataMessage") }) public class Base { http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java b/src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java new file mode 100644 index 0000000..bac9ddf --- /dev/null +++ b/src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java @@ -0,0 +1,47 @@ +package org.apache.gossip.model; + +public class SharedGossipDataMessage extends Base { + + private String nodeId; + private String key; + private Object payload; + private Long timestamp; + private Long expireAt; + + public String getNodeId() { + return nodeId; + } + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + public String getKey() { + return key; + } + public void setKey(String key) { + this.key = key; + } + public Object getPayload() { + return payload; + } + public void setPayload(Object payload) { + this.payload = payload; + } + public Long getTimestamp() { + return timestamp; + } + public void setTimestamp(Long timestamp) { + this.timestamp = timestamp; + } + public Long getExpireAt() { + return expireAt; + } + public void setExpireAt(Long expireAt) { + this.expireAt = expireAt; + } + @Override + public String toString() { + return "SharedGossipDataMessage [nodeId=" + nodeId + ", key=" + key + ", payload=" + payload + + ", timestamp=" + timestamp + ", expireAt=" + expireAt + "]"; + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java b/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java new file mode 100644 index 0000000..cb99759 --- /dev/null +++ b/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java @@ -0,0 +1,31 @@ +package org.apache.gossip.udp; + +import org.apache.gossip.model.SharedGossipDataMessage; + +public class UdpSharedGossipDataMessage extends SharedGossipDataMessage implements Trackable { + + private String uriFrom; + private String uuid; + + public String getUriFrom() { + return uriFrom; + } + + public void setUriFrom(String uriFrom) { + this.uriFrom = uriFrom; + } + + public String getUuid() { + return uuid; + } + + public void setUuid(String uuid) { + this.uuid = uuid; + } + + @Override + public String toString() { + return "UdpSharedGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/test/java/org/apache/gossip/DataTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java index 02c89a8..4909bf8 100644 --- a/src/test/java/org/apache/gossip/DataTest.java +++ b/src/test/java/org/apache/gossip/DataTest.java @@ -12,6 +12,7 @@ import java.util.concurrent.TimeUnit; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.SharedGossipDataMessage; import org.junit.Test; import io.teknek.tunit.TUnit; @@ -19,7 +20,7 @@ import io.teknek.tunit.TUnit; public class DataTest { @Test - public void abc() throws InterruptedException, UnknownHostException, URISyntaxException{ + public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{ GossipSettings settings = new GossipSettings(); String cluster = UUID.randomUUID().toString(); int seedNodes = 1; @@ -51,20 +52,32 @@ public class DataTest { return total; }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); clients.get(0).gossipPerNodeData(msg()); + clients.get(0).gossipSharedData(sharedMsg()); Thread.sleep(10000); TUnit.assertThat( - - new Callable (){ + new Callable() { public Object call() throws Exception { - GossipDataMessage x = clients.get(1).findPerNodeData(1+"" , "a"); - if (x == null) return ""; - else return x.getPayload(); - }}) - - - //() -> clients.get(1).findGossipData(1+"" , "a").getPayload()) - .afterWaitingAtMost(20, TimeUnit.SECONDS) - .isEqualTo("b"); + GossipDataMessage x = clients.get(1).findPerNodeData(1 + "", "a"); + if (x == null) + return ""; + else + return x.getPayload(); + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b"); + + + TUnit.assertThat( + new Callable() { + public Object call() throws Exception { + SharedGossipDataMessage x = clients.get(1).findSharedData("a"); + if (x == null) + return ""; + else + return x.getPayload(); + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c"); + + for (int i = 0; i < clusterMembers; ++i) { clients.get(i).shutdown(); } @@ -78,4 +91,14 @@ public class DataTest { g.setTimestamp(System.currentTimeMillis()); return g; } + + private SharedGossipDataMessage sharedMsg(){ + SharedGossipDataMessage g = new SharedGossipDataMessage(); + g.setExpireAt(Long.MAX_VALUE); + g.setKey("a"); + g.setPayload("c"); + g.setTimestamp(System.currentTimeMillis()); + return g; + } + } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/201b101a/src/test/java/org/apache/gossip/manager/DataReaperTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/gossip/manager/DataReaperTest.java b/src/test/java/org/apache/gossip/manager/DataReaperTest.java index 4cd5dfe..d0164b1 100644 --- a/src/test/java/org/apache/gossip/manager/DataReaperTest.java +++ b/src/test/java/org/apache/gossip/manager/DataReaperTest.java @@ -5,6 +5,7 @@ import java.net.URI; import org.apache.gossip.GossipSettings; import org.apache.gossip.manager.random.RandomGossipManager; import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.SharedGossipDataMessage; import org.junit.Assert; import org.junit.Test; @@ -21,9 +22,13 @@ public class DataReaperTest { GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings) .withId(myId).uri(URI.create("udp://localhost:5000")).build(); gm.gossipPerNodeData(perNodeDatum(key, value)); - Assert.assertEquals(value, gm.findGossipData(myId, key).getPayload()); - gm.getDataReaper().runOnce(); - TUnit.assertThat(() -> gm.findGossipData(myId, key)).equals(null); + gm.gossipSharedData(sharedDatum(key, value)); + Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload()); + Assert.assertEquals(value, gm.findSharedGossipData(key).getPayload()); + gm.getDataReaper().runPerNodeOnce(); + gm.getDataReaper().runSharedOnce(); + TUnit.assertThat(() -> gm.findPerNodeGossipData(myId, key)).equals(null); + TUnit.assertThat(() -> gm.findSharedGossipData(key)).equals(null); } private GossipDataMessage perNodeDatum(String key, String value) { @@ -34,6 +39,16 @@ public class DataReaperTest { m.setTimestamp(System.currentTimeMillis()); return m; } + + private SharedGossipDataMessage sharedDatum(String key, String value) { + SharedGossipDataMessage m = new SharedGossipDataMessage(); + m.setExpireAt(System.currentTimeMillis() + 5L); + m.setKey(key); + m.setPayload(value); + m.setTimestamp(System.currentTimeMillis()); + return m; + } + @Test public void testHigherTimestampWins() { @@ -47,9 +62,9 @@ public class DataReaperTest { GossipDataMessage after = perNodeDatum(key, "b"); after.setTimestamp(after.getTimestamp() - 1); gm.gossipPerNodeData(before); - Assert.assertEquals(value, gm.findGossipData(myId, key).getPayload()); + Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload()); gm.gossipPerNodeData(after); - Assert.assertEquals(value, gm.findGossipData(myId, key).getPayload()); + Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload()); } }