Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 44672200CBC for ; Tue, 20 Jun 2017 15:05:02 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 42D96160BE1; Tue, 20 Jun 2017 13:05:02 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B53BD160BD3 for ; Tue, 20 Jun 2017 15:05:00 +0200 (CEST) Received: (qmail 81400 invoked by uid 500); 20 Jun 2017 13:04:59 -0000 Mailing-List: contact commits-help@gossip.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@gossip.incubator.apache.org Delivered-To: mailing list commits@gossip.incubator.apache.org Received: (qmail 81383 invoked by uid 99); 20 Jun 2017 13:04:59 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Jun 2017 13:04:59 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 880D01B0E00 for ; Tue, 20 Jun 2017 13:04:58 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.221 X-Spam-Level: X-Spam-Status: No, score=-4.221 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id hXBXzRfDbwL4 for ; Tue, 20 Jun 2017 13:04:51 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 951C55F2AE for ; Tue, 20 Jun 2017 13:04:50 +0000 (UTC) Received: (qmail 81284 invoked by uid 99); 20 Jun 2017 13:04:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Jun 2017 13:04:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7EE2FDFAF5; Tue, 20 Jun 2017 13:04:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ecapriolo@apache.org To: commits@gossip.incubator.apache.org Date: Tue, 20 Jun 2017 13:04:49 -0000 Message-Id: <1f2210afc88645358872bc3819ed17f5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-gossip git commit: GOSSIP-55 Added event handlers to notify share data and per node data changes archived-at: Tue, 20 Jun 2017 13:05:02 -0000 Repository: incubator-gossip Updated Branches: refs/heads/master 602a79bfc -> c009b77d2 GOSSIP-55 Added event handlers to notify share data and per node data changes Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/ade33a9e Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/ade33a9e Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/ade33a9e Branch: refs/heads/master Commit: ade33a9e58046cf3c9c4d05edd9ef58f3d814e9f Parents: 9c9d96e Author: Mirage Abeysekara Authored: Thu Jun 1 22:21:14 2017 +0530 Committer: Miraj Abeysekara Committed: Tue Jun 20 18:17:07 2017 +0530 ---------------------------------------------------------------------- .../gossip/event/data/DataEventConstants.java | 42 ++++ .../gossip/event/data/DataEventManager.java | 102 +++++++++ .../event/data/UpdateNodeDataEventHandler.java | 37 ++++ .../data/UpdateSharedDataEventHandler.java | 34 +++ .../org/apache/gossip/manager/GossipCore.java | 45 +++- .../apache/gossip/manager/GossipManager.java | 18 ++ .../gossip/event/data/DataEventManagerTest.java | 166 +++++++++++++++ .../examples/StandAloneNodeCrdtOrSet.java | 13 +- .../org/apache/gossip/PerNodeDataEventTest.java | 130 ++++++++++++ .../org/apache/gossip/SharedDataEventTest.java | 205 +++++++++++++++++++ 10 files changed, 786 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ade33a9e/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventConstants.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventConstants.java b/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventConstants.java new file mode 100644 index 0000000..217087f --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventConstants.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.event.data; + +public class DataEventConstants { + + // MetricRegistry + public static final String PER_NODE_DATA_SUBSCRIBERS_SIZE + = "gossip.event.data.pernode.subscribers.size"; + public static final String PER_NODE_DATA_SUBSCRIBERS_QUEUE_SIZE + = "gossip.event.data.pernode.subscribers.queue.size"; + public static final String SHARED_DATA_SUBSCRIBERS_SIZE + = "gossip.event.data.shared.subscribers.size"; + public static final String SHARED_DATA_SUBSCRIBERS_QUEUE_SIZE + = "gossip.event.data.shared.subscribers.queue.size"; + + // Thread pool + public static final int PER_NODE_DATA_QUEUE_SIZE = 64; + public static final int PER_NODE_DATA_CORE_POOL_SIZE = 1; + public static final int PER_NODE_DATA_MAX_POOL_SIZE = 30; + public static final int PER_NODE_DATA_KEEP_ALIVE_TIME_SECONDS = 1; + public static final int SHARED_DATA_QUEUE_SIZE = 64; + public static final int SHARED_DATA_CORE_POOL_SIZE = 1; + public static final int SHARED_DATA_MAX_POOL_SIZE = 30; + public static final int SHARED_DATA_KEEP_ALIVE_TIME_SECONDS = 1; + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ade33a9e/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventManager.java b/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventManager.java new file mode 100644 index 0000000..3124df1 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventManager.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.event.data; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.MetricRegistry; + +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class DataEventManager { + + private final List perNodeDataHandlers; + private final BlockingQueue perNodeDataHandlerQueue; + private final ExecutorService perNodeDataEventExecutor; + private final List sharedDataHandlers; + private final BlockingQueue sharedDataHandlerQueue; + private final ExecutorService sharedDataEventExecutor; + + public DataEventManager(MetricRegistry metrics) { + perNodeDataHandlers = new CopyOnWriteArrayList<>(); + perNodeDataHandlerQueue = new ArrayBlockingQueue<>(DataEventConstants.PER_NODE_DATA_QUEUE_SIZE); + perNodeDataEventExecutor = new ThreadPoolExecutor( + DataEventConstants.PER_NODE_DATA_CORE_POOL_SIZE, + DataEventConstants.PER_NODE_DATA_MAX_POOL_SIZE, + DataEventConstants.PER_NODE_DATA_KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS, + perNodeDataHandlerQueue, new ThreadPoolExecutor.DiscardOldestPolicy()); + + sharedDataHandlers = new CopyOnWriteArrayList<>(); + sharedDataHandlerQueue = new ArrayBlockingQueue<>(DataEventConstants.SHARED_DATA_QUEUE_SIZE); + sharedDataEventExecutor = new ThreadPoolExecutor(DataEventConstants.SHARED_DATA_CORE_POOL_SIZE, + DataEventConstants.SHARED_DATA_MAX_POOL_SIZE, + DataEventConstants.SHARED_DATA_KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS, + sharedDataHandlerQueue, new ThreadPoolExecutor.DiscardOldestPolicy()); + + metrics.register(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_SIZE, + (Gauge) () -> perNodeDataHandlers.size()); + metrics.register(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_QUEUE_SIZE, + (Gauge) () -> perNodeDataHandlerQueue.size()); + metrics.register(DataEventConstants.SHARED_DATA_SUBSCRIBERS_SIZE, + (Gauge) () -> sharedDataHandlers.size()); + metrics.register(DataEventConstants.SHARED_DATA_SUBSCRIBERS_QUEUE_SIZE, + (Gauge) () -> sharedDataHandlerQueue.size()); + + } + + public void notifySharedData(final String key, final Object newValue, final Object oldValue) { + sharedDataHandlers.forEach(handler -> sharedDataEventExecutor + .execute(() -> handler.onUpdate(key, oldValue, newValue))); + } + + public void notifyPerNodeData(final String nodeId, final String key, final Object newValue, + final Object oldValue) { + perNodeDataHandlers.forEach(handler -> perNodeDataEventExecutor + .execute(() -> handler.onUpdate(nodeId, key, oldValue, newValue))); + } + + public void registerPerNodeDataSubscriber(UpdateNodeDataEventHandler handler) { + perNodeDataHandlers.add(handler); + } + + public void unregisterPerNodeDataSubscriber(UpdateNodeDataEventHandler handler) { + perNodeDataHandlers.remove(handler); + } + + public int getPerNodeSubscribersSize() { + return perNodeDataHandlers.size(); + } + + public void registerSharedDataSubscriber(UpdateSharedDataEventHandler handler) { + sharedDataHandlers.add(handler); + } + + public void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler) { + sharedDataHandlers.remove(handler); + } + + public int getSharedDataSubscribersSize() { + return sharedDataHandlers.size(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ade33a9e/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateNodeDataEventHandler.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateNodeDataEventHandler.java b/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateNodeDataEventHandler.java new file mode 100644 index 0000000..ca88c17 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateNodeDataEventHandler.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.event.data; + +/** + * Event handler interface for the per node data items. + * Classes which implement this interface get notifications when per node data item get changed. + */ +public interface UpdateNodeDataEventHandler { + + /** + * This method get called when a per node datum get changed. + * + * @param nodeId id of the node that change the value + * @param key key of the datum + * @param oldValue previous value of the datum or null if the datum is discovered + * for the first time + * @param newValue updated value of the datum + */ + void onUpdate(String nodeId, String key, Object oldValue, Object newValue); + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ade33a9e/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateSharedDataEventHandler.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateSharedDataEventHandler.java b/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateSharedDataEventHandler.java new file mode 100644 index 0000000..5655732 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateSharedDataEventHandler.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.event.data; + +/** + * Event handler interface for shared data items. + * Classes which implement this interface get notifications when shared data get changed. + */ +public interface UpdateSharedDataEventHandler { + /** + * This method get called when shared data get changed. + * + * @param key key of the shared data item + * @param oldValue previous value or null if the data is discovered for the first time + * @param newValue updated value of the data item + */ + void onUpdate(String key, Object oldValue, Object newValue); + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ade33a9e/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java index e034432..4167664 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -20,12 +20,18 @@ package org.apache.gossip.manager; import com.codahale.metrics.Gauge; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; -import org.apache.gossip.Member; import org.apache.gossip.LocalMember; +import org.apache.gossip.Member; import org.apache.gossip.RemoteMember; import org.apache.gossip.crdt.Crdt; import org.apache.gossip.event.GossipState; -import org.apache.gossip.model.*; +import org.apache.gossip.event.data.DataEventManager; +import org.apache.gossip.event.data.UpdateNodeDataEventHandler; +import org.apache.gossip.event.data.UpdateSharedDataEventHandler; +import org.apache.gossip.model.Base; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.Response; +import org.apache.gossip.model.SharedDataMessage; import org.apache.gossip.udp.Trackable; import org.apache.log4j.Logger; @@ -55,13 +61,15 @@ public class GossipCore implements GossipCoreConstants { private final Meter messageSerdeException; private final Meter tranmissionException; private final Meter tranmissionSuccess; - + private final DataEventManager eventManager; + public GossipCore(GossipManager manager, MetricRegistry metrics){ this.gossipManager = manager; requests = new ConcurrentHashMap<>(); workQueue = new ArrayBlockingQueue<>(1024); perNodeData = new ConcurrentHashMap<>(); sharedData = new ConcurrentHashMap<>(); + eventManager = new DataEventManager(metrics); metrics.register(WORKQUEUE_SIZE, (Gauge)() -> workQueue.size()); metrics.register(PER_NODE_DATA_SIZE, (Gauge)() -> perNodeData.size()); metrics.register(SHARED_DATA_SIZE, (Gauge)() -> sharedData.size()); @@ -76,6 +84,7 @@ public class GossipCore implements GossipCoreConstants { while (true){ SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), message); if (previous == null){ + eventManager.notifySharedData(message.getKey(), message.getPayload(), null); return; } if (message.getPayload() instanceof Crdt){ @@ -88,12 +97,17 @@ public class GossipCore implements GossipCoreConstants { merged.setPayload(mergedCrdt); boolean replaced = sharedData.replace(message.getKey(), previous, merged); if (replaced){ + if(!merged.getPayload().equals(previous.getPayload())) { + eventManager + .notifySharedData(message.getKey(), merged.getPayload(), previous.getPayload()); + } return; } } else { if (previous.getTimestamp() < message.getTimestamp()){ boolean result = sharedData.replace(message.getKey(), previous, message); if (result){ + eventManager.notifySharedData(message.getKey(), message.getPayload(), previous.getPayload()); return; } } else { @@ -102,7 +116,7 @@ public class GossipCore implements GossipCoreConstants { } } } - + public void addPerNodeData(PerNodeDataMessage message){ ConcurrentHashMap nodeMap = new ConcurrentHashMap<>(); nodeMap.put(message.getKey(), message); @@ -111,11 +125,16 @@ public class GossipCore implements GossipCoreConstants { PerNodeDataMessage current = nodeMap.get(message.getKey()); if (current == null){ nodeMap.putIfAbsent(message.getKey(), message); + eventManager.notifyPerNodeData(message.getNodeId(), message.getKey(), message.getPayload(), null); } else { if (current.getTimestamp() < message.getTimestamp()){ nodeMap.replace(message.getKey(), current, message); + eventManager.notifyPerNodeData(message.getNodeId(), message.getKey(), message.getPayload(), + current.getPayload()); } } + } else { + eventManager.notifyPerNodeData(message.getNodeId(), message.getKey(), message.getPayload(), null); } } @@ -178,7 +197,7 @@ public class GossipCore implements GossipCoreConstants { sendInternal(message, uri); if (latchAndBase == null){ return null; - } + } try { boolean complete = latchAndBase.latch.await(1, TimeUnit.SECONDS); @@ -297,4 +316,20 @@ public class GossipCore implements GossipCoreConstants { } } } + + void registerPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){ + eventManager.registerPerNodeDataSubscriber(handler); + } + + void registerSharedDataSubscriber(UpdateSharedDataEventHandler handler){ + eventManager.registerSharedDataSubscriber(handler); + } + + void unregisterPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){ + eventManager.unregisterPerNodeDataSubscriber(handler); + } + + void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler){ + eventManager.unregisterSharedDataSubscriber(handler); + } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ade33a9e/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java index 133a79f..d839b2e 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -26,6 +26,8 @@ import org.apache.gossip.Member; import org.apache.gossip.crdt.Crdt; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; +import org.apache.gossip.event.data.UpdateNodeDataEventHandler; +import org.apache.gossip.event.data.UpdateSharedDataEventHandler; import org.apache.gossip.manager.handlers.MessageHandler; import org.apache.gossip.model.PerNodeDataMessage; import org.apache.gossip.model.SharedDataMessage; @@ -348,4 +350,20 @@ public abstract class GossipManager { return new File(manager.getSettings().getPathToDataState(), "pernodedata." + manager.getMyself().getClusterName() + "." + manager.getMyself().getId() + ".json"); } + + public void registerPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){ + gossipCore.registerPerNodeDataSubscriber(handler); + } + + public void registerSharedDataSubscriber(UpdateSharedDataEventHandler handler){ + gossipCore.registerSharedDataSubscriber(handler); + } + + public void unregisterPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){ + gossipCore.unregisterPerNodeDataSubscriber(handler); + } + + public void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler){ + gossipCore.unregisterSharedDataSubscriber(handler); + } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ade33a9e/gossip-base/src/test/java/org/apache/gossip/event/data/DataEventManagerTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/event/data/DataEventManagerTest.java b/gossip-base/src/test/java/org/apache/gossip/event/data/DataEventManagerTest.java new file mode 100644 index 0000000..d9d778f --- /dev/null +++ b/gossip-base/src/test/java/org/apache/gossip/event/data/DataEventManagerTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.event.data; + +import com.codahale.metrics.MetricRegistry; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +@RunWith(JUnitPlatform.class) +public class DataEventManagerTest { + + private static Semaphore semaphore; + private String receivedNodeId; + private String receivedKey; + private Object receivedNewValue; + private Object receivedOldValue; + + @BeforeClass + public static void setup() { + semaphore = new Semaphore(0); + } + + @Test + public void perNodeDataEventHandlerAddRemoveTest() { + DataEventManager eventManager = new DataEventManager(new MetricRegistry()); + + UpdateNodeDataEventHandler nodeDataEventHandler = (nodeId, key, oldValue, newValue) -> { + }; + + eventManager.registerPerNodeDataSubscriber(nodeDataEventHandler); + Assert.assertEquals(1, eventManager.getPerNodeSubscribersSize()); + eventManager.unregisterPerNodeDataSubscriber(nodeDataEventHandler); + Assert.assertEquals(0, eventManager.getPerNodeSubscribersSize()); + } + + // Test whether the per node data events are fired for matching key + @Test + public void perNodeDataEventHandlerTest() throws InterruptedException { + DataEventManager eventManager = new DataEventManager(new MetricRegistry()); + resetData(); + + // A new subscriber "Juliet" is like to notified when per node data change for the key "Romeo" + UpdateNodeDataEventHandler juliet = (nodeId, key, oldValue, newValue) -> { + if(!key.equals("Romeo")) return; + receivedNodeId = nodeId; + receivedKey = key; + receivedNewValue = newValue; + receivedOldValue = oldValue; + semaphore.release(); + }; + // Juliet register with eventManager + eventManager.registerPerNodeDataSubscriber(juliet); + // Romeo is going to sleep after having dinner + eventManager.notifyPerNodeData("Montague", "Romeo", "sleeping", "eating"); + + // Juliet should notified + semaphore.tryAcquire(2, TimeUnit.SECONDS); + Assert.assertEquals("Montague", receivedNodeId); + Assert.assertEquals("Romeo", receivedKey); + Assert.assertEquals("sleeping", receivedNewValue); + Assert.assertEquals("eating", receivedOldValue); + + eventManager.unregisterPerNodeDataSubscriber(juliet); + } + + @Test + public void sharedDataEventHandlerAddRemoveTest() { + DataEventManager eventManager = new DataEventManager(new MetricRegistry()); + + UpdateSharedDataEventHandler sharedDataEventHandler = (key, oldValue, newValue) -> { + + }; + eventManager.registerSharedDataSubscriber(sharedDataEventHandler); + Assert.assertEquals(1, eventManager.getSharedDataSubscribersSize()); + eventManager.unregisterSharedDataSubscriber(sharedDataEventHandler); + Assert.assertEquals(0, eventManager.getSharedDataSubscribersSize()); + + } + + // Test whether the shared data events are fired + @Test + public void sharedDataEventHandlerTest() throws InterruptedException { + DataEventManager eventManager = new DataEventManager(new MetricRegistry()); + resetData(); + + // A new subscriber "Alice" is like to notified when shared data change for the key "technology" + UpdateSharedDataEventHandler alice = (key, oldValue, newValue) -> { + if(!key.equals("technology")) return; + receivedKey = key; + receivedNewValue = newValue; + receivedOldValue = oldValue; + semaphore.release(); + }; + // Alice register with eventManager + eventManager.registerSharedDataSubscriber(alice); + + // technology key get changed + eventManager.notifySharedData("technology", "Java has lambda", "Java is fast"); + + // Alice should notified + semaphore.tryAcquire(2, TimeUnit.SECONDS); + Assert.assertEquals("technology", receivedKey); + Assert.assertEquals("Java has lambda", receivedNewValue); + Assert.assertEquals("Java is fast", receivedOldValue); + + eventManager.unregisterSharedDataSubscriber(alice); + } + + // Test the MetricRegistry + @Test + public void metricRegistryTest() { + MetricRegistry registry = new MetricRegistry(); + + DataEventManager eventManager = new DataEventManager(registry); + + UpdateNodeDataEventHandler nodeDataEventHandler = (nodeId, key, oldValue, newValue) -> { + }; + + UpdateSharedDataEventHandler sharedDataEventHandler = (key, oldValue, newValue) -> { + }; + + eventManager.registerPerNodeDataSubscriber(nodeDataEventHandler); + eventManager.registerSharedDataSubscriber(sharedDataEventHandler); + + Assert.assertEquals(1, + registry.getGauges().get(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_SIZE).getValue()); + Assert.assertEquals(0, + registry.getGauges().get(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_QUEUE_SIZE) + .getValue()); + Assert.assertEquals(1, + registry.getGauges().get(DataEventConstants.SHARED_DATA_SUBSCRIBERS_SIZE).getValue()); + Assert.assertEquals(0, + registry.getGauges().get(DataEventConstants.SHARED_DATA_SUBSCRIBERS_QUEUE_SIZE) + .getValue()); + + } + + private void resetData() { + receivedNodeId = null; + receivedKey = null; + receivedNewValue = null; + receivedOldValue = null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ade33a9e/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java ---------------------------------------------------------------------- diff --git a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java index 7d4db93..78c7782 100644 --- a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java +++ b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java @@ -79,10 +79,21 @@ public class StandAloneNodeCrdtOrSet { } else if (op == 'g') { gcount(val, gossipService); } + if (op == 'l') { + listen(val, gossipService); + } } } } - + + private static void listen(String val, GossipManager gossipManager) { + gossipManager.registerSharedDataSubscriber((key, oldValue, newValue) -> { + if (key.equals(val)) { + System.out.println("Event Handler fired! " + oldValue + " " + newValue); + } + }); + } + private static void gcount(String val, GossipManager gossipManager) { GrowOnlyCounter c = (GrowOnlyCounter) gossipManager.findCrdt("def"); Long l = Long.valueOf(val); http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ade33a9e/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java ---------------------------------------------------------------------- diff --git a/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java b/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java new file mode 100644 index 0000000..59136d1 --- /dev/null +++ b/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import io.teknek.tunit.TUnit; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; +import org.apache.gossip.model.PerNodeDataMessage; +import org.junit.Assert; +import org.junit.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +@RunWith(JUnitPlatform.class) +public class PerNodeDataEventTest extends AbstractIntegrationBase { + + private String receivedKey = ""; + private String receivingNodeId = ""; + private Object receivingNodeDataNewValue = ""; + private Object receivingNodeDataOldValue = ""; + private Semaphore lock = new Semaphore(0); + + + @Test + public void perNodeDataEventTest() + throws InterruptedException, UnknownHostException, URISyntaxException { + GossipSettings settings = new GossipSettings(); + settings.setPersistRingState(false); + settings.setPersistDataState(false); + String cluster = UUID.randomUUID().toString(); + int seedNodes = 1; + List startupMembers = new ArrayList<>(); + for (int i = 1; i < seedNodes + 1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + startupMembers.add(new RemoteMember(cluster, uri, i + "")); + } + final List clients = new ArrayList<>(); + final int clusterMembers = 2; + for (int i = 1; i < clusterMembers + 1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri) + .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build(); + clients.add(gossipService); + gossipService.init(); + register(gossipService); + } + + // check whether the members are discovered + TUnit.assertThat(() -> { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).getLiveMembers().size(); + } + return total; + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); + + // Adding new data to Node 1 + clients.get(0).gossipPerNodeData(getPerNodeData("category", "distributed")); + + // Node 2 is interested in data changes for the key "organization" and "category" + clients.get(1).registerPerNodeDataSubscriber((nodeId, key, oldValue, newValue) -> { + if (!key.equals("organization") && !key.equals("category")) return; + receivingNodeId = nodeId; + receivedKey = key; + receivingNodeDataOldValue = oldValue; + receivingNodeDataNewValue = newValue; + lock.release(); + }); + + // Node 2 first time adds Node 1 data + lock.tryAcquire(10, TimeUnit.SECONDS); + Assert.assertEquals("1", receivingNodeId); + Assert.assertEquals("category", receivedKey); + Assert.assertEquals(null, receivingNodeDataOldValue); + Assert.assertEquals("distributed", receivingNodeDataNewValue); + + // Node 1 adds new per node data + clients.get(0).gossipPerNodeData(getPerNodeData("organization", "apache")); + // Node 2 adds new data key from Node 1 + lock.tryAcquire(10, TimeUnit.SECONDS); + Assert.assertEquals("1", receivingNodeId); + Assert.assertEquals("organization", receivedKey); + Assert.assertEquals(null, receivingNodeDataOldValue); + Assert.assertEquals("apache", receivingNodeDataNewValue); + + // Node 1 updates its value + clients.get(0).gossipPerNodeData(getPerNodeData("organization", "apache-gossip")); + // Node 2 updates existing value + lock.tryAcquire(10, TimeUnit.SECONDS); + Assert.assertEquals("1", receivingNodeId); + Assert.assertEquals("organization", receivedKey); + Assert.assertEquals("apache", receivingNodeDataOldValue); + Assert.assertEquals("apache-gossip", receivingNodeDataNewValue); + + } + + private PerNodeDataMessage getPerNodeData(String key, String value) { + PerNodeDataMessage g = new PerNodeDataMessage(); + g.setExpireAt(Long.MAX_VALUE); + g.setKey(key); + g.setPayload(value); + g.setTimestamp(System.currentTimeMillis()); + return g; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/ade33a9e/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java ---------------------------------------------------------------------- diff --git a/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java b/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java new file mode 100644 index 0000000..56f1657 --- /dev/null +++ b/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import io.teknek.tunit.TUnit; +import org.apache.gossip.crdt.GrowOnlyCounter; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; +import org.apache.gossip.model.SharedDataMessage; +import org.junit.Assert; +import org.junit.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +@RunWith(JUnitPlatform.class) +public class SharedDataEventTest extends AbstractIntegrationBase { + + private String receivedKey = ""; + private Object receivingNodeDataNewValue = ""; + private Object receivingNodeDataOldValue = ""; + private String gCounterKey = "gCounter"; + private Semaphore lock = new Semaphore(0); + + @Test + public void sharedDataEventTest() + throws InterruptedException, UnknownHostException, URISyntaxException { + GossipSettings settings = new GossipSettings(); + settings.setPersistRingState(false); + settings.setPersistDataState(false); + String cluster = UUID.randomUUID().toString(); + int seedNodes = 1; + List startupMembers = new ArrayList<>(); + for (int i = 1; i < seedNodes + 1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + startupMembers.add(new RemoteMember(cluster, uri, i + "")); + } + final List clients = new ArrayList<>(); + final int clusterMembers = 2; + for (int i = 1; i < clusterMembers + 1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri) + .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build(); + clients.add(gossipService); + gossipService.init(); + register(gossipService); + } + + // check whether the members are discovered + TUnit.assertThat(() -> { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).getLiveMembers().size(); + } + return total; + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); + + // Adding new data to Node 1 + clients.get(0).gossipSharedData(sharedNodeData("category", "distributed")); + + // Node 2 is interested in data changes for the key "organization" and "category" + clients.get(1).registerSharedDataSubscriber((key, oldValue, newValue) -> { + if (!key.equals("organization") && !key.equals("category")) + return; + receivedKey = key; + receivingNodeDataOldValue = oldValue; + receivingNodeDataNewValue = newValue; + lock.release(); + }); + + // Node 2 first time gets shared data + lock.tryAcquire(10, TimeUnit.SECONDS); + Assert.assertEquals("category", receivedKey); + Assert.assertEquals(null, receivingNodeDataOldValue); + Assert.assertEquals("distributed", receivingNodeDataNewValue); + + // Node 1 adds new per node data + clients.get(0).gossipSharedData(sharedNodeData("organization", "apache")); + // Node 2 adds new shared data + lock.tryAcquire(10, TimeUnit.SECONDS); + Assert.assertEquals("organization", receivedKey); + Assert.assertEquals(null, receivingNodeDataOldValue); + Assert.assertEquals("apache", receivingNodeDataNewValue); + + // Node 1 updates its value + clients.get(0).gossipSharedData(sharedNodeData("organization", "apache-gossip")); + + // Node 2 updates existing value + lock.tryAcquire(10, TimeUnit.SECONDS); + Assert.assertEquals("organization", receivedKey); + Assert.assertEquals("apache", receivingNodeDataOldValue); + Assert.assertEquals("apache-gossip", receivingNodeDataNewValue); + + } + + @Test + public void CrdtDataChangeEventTest() + throws InterruptedException, UnknownHostException, URISyntaxException { + GossipSettings settings = new GossipSettings(); + settings.setPersistRingState(false); + settings.setPersistDataState(false); + String cluster = UUID.randomUUID().toString(); + int seedNodes = 1; + List startupMembers = new ArrayList<>(); + for (int i = 1; i < seedNodes + 1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + startupMembers.add(new RemoteMember(cluster, uri, i + "")); + } + final List clients = new ArrayList<>(); + final int clusterMembers = 3; + for (int i = 1; i < clusterMembers + 1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri) + .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build(); + clients.add(gossipService); + gossipService.init(); + register(gossipService); + } + + // check whether the members are discovered + TUnit.assertThat(() -> { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).getLiveMembers().size(); + } + return total; + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); + + clients.get(1).registerSharedDataSubscriber((key, oldValue, newValue) -> { + receivedKey = key; + receivingNodeDataOldValue = oldValue; + receivingNodeDataNewValue = newValue; + lock.release(); + }); + + // Add initial gCounter to Node 1 + SharedDataMessage d = new SharedDataMessage(); + d.setKey(gCounterKey); + d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L))); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + clients.get(0).merge(d); + + // Check if initial Crdt received + lock.tryAcquire(10, TimeUnit.SECONDS); + Assert.assertEquals("gCounter", receivedKey); + Assert.assertEquals(null, receivingNodeDataOldValue); + Assert.assertTrue(receivingNodeDataNewValue instanceof GrowOnlyCounter); + Assert.assertEquals(1, ((GrowOnlyCounter) receivingNodeDataNewValue).value().longValue()); + + // Node 3 Updates the gCounter by 4 + GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(2).findCrdt(gCounterKey); + GrowOnlyCounter gcNew = new GrowOnlyCounter(gc, + new GrowOnlyCounter.Builder(clients.get(2)).increment(4L)); + + d = new SharedDataMessage(); + d.setKey(gCounterKey); + d.setPayload(gcNew); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + clients.get(2).merge(d); + + // Check if Node 3's Crdt update is received in Node 2 event handler + lock.tryAcquire(10, TimeUnit.SECONDS); + Assert.assertEquals("gCounter", receivedKey); + Assert.assertTrue(receivingNodeDataOldValue instanceof GrowOnlyCounter); + Assert.assertEquals(1, ((GrowOnlyCounter) receivingNodeDataOldValue).value().longValue()); + Assert.assertTrue(receivingNodeDataNewValue instanceof GrowOnlyCounter); + Assert.assertEquals(5, ((GrowOnlyCounter) receivingNodeDataNewValue).value().longValue()); + + } + + private SharedDataMessage sharedNodeData(String key, String value) { + SharedDataMessage g = new SharedDataMessage(); + g.setExpireAt(Long.MAX_VALUE); + g.setKey(key); + g.setPayload(value); + g.setTimestamp(System.currentTimeMillis()); + return g; + } + +}