Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 06E44200C69 for ; Sat, 6 May 2017 17:52:04 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 055AC160BAD; Sat, 6 May 2017 15:52:04 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 860C1160BA2 for ; Sat, 6 May 2017 17:52:01 +0200 (CEST) Received: (qmail 70626 invoked by uid 500); 6 May 2017 15:52:00 -0000 Mailing-List: contact commits-help@gossip.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@gossip.incubator.apache.org Delivered-To: mailing list commits@gossip.incubator.apache.org Received: (qmail 70609 invoked by uid 99); 6 May 2017 15:52:00 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 06 May 2017 15:52:00 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 27BF61AA97C for ; Sat, 6 May 2017 15:52:00 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id dS-xgr9CPqry for ; Sat, 6 May 2017 15:51:46 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 7170D5F523 for ; Sat, 6 May 2017 15:51:43 +0000 (UTC) Received: (qmail 70504 invoked by uid 99); 6 May 2017 15:51:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 06 May 2017 15:51:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 79BE4DFE61; Sat, 6 May 2017 15:51:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ecapriolo@apache.org To: commits@gossip.incubator.apache.org Date: Sat, 06 May 2017 15:51:43 -0000 Message-Id: <76cbf11cee1b4e278c2b655f69837452@git.apache.org> In-Reply-To: <8ee0a59916e3441cb8d9fc353be73e16@git.apache.org> References: <8ee0a59916e3441cb8d9fc353be73e16@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] incubator-gossip git commit: GOSSIP-85 Factor out PassiveGossipThread archived-at: Sat, 06 May 2017 15:52:04 -0000 GOSSIP-85 Factor out PassiveGossipThread Project: http://git-wip-us.apache.org/repos/asf/incubator-gossip/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gossip/commit/c62ebaf9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gossip/tree/c62ebaf9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gossip/diff/c62ebaf9 Branch: refs/heads/master Commit: c62ebaf9b6054b669ee77d61497d51d1b382309d Parents: e3010c8 Author: Edward Capriolo Authored: Sat Apr 29 19:45:16 2017 -0400 Committer: Edward Capriolo Committed: Wed May 3 16:57:50 2017 -0400 ---------------------------------------------------------------------- gossip-base/pom.xml | 2 +- .../java/org/apache/gossip/GossipSettings.java | 3 +- .../apache/gossip/manager/GossipManager.java | 4 +- .../gossip/manager/PassiveGossipThread.java | 76 ------ .../gossip/manager/RingStatePersister.java | 3 - .../transport/AbstractTransportManager.java | 16 +- .../test/java/org/apache/gossip/DataTest.java | 240 ------------------- .../org/apache/gossip/IdAndPropertyTest.java | 93 ------- .../org/apache/gossip/ShutdownDeadtimeTest.java | 150 ------------ .../org/apache/gossip/SignedMessageTest.java | 119 --------- .../org/apache/gossip/StartupSettingsTest.java | 91 ------- .../org/apache/gossip/TenNodeThreeSeedTest.java | 94 -------- .../manager/handlers/MessageHandlerTest.java | 2 +- .../transport/UnitTestTransportManager.java | 3 +- gossip-itest/pom.xml | 88 +++++++ .../test/java/org/apache/gossip/DataTest.java | 238 ++++++++++++++++++ .../org/apache/gossip/IdAndPropertyTest.java | 91 +++++++ .../org/apache/gossip/ShutdownDeadtimeTest.java | 148 ++++++++++++ .../org/apache/gossip/SignedMessageTest.java | 117 +++++++++ .../org/apache/gossip/StartupSettingsTest.java | 91 +++++++ .../org/apache/gossip/TenNodeThreeSeedTest.java | 92 +++++++ gossip-protocol-jackson/pom.xml | 6 +- .../gossip/protocol/json/JacksonTest.java | 6 - .../gossip/protocol/json/TestMessage.java | 1 + gossip-transport-udp/pom.xml | 4 +- .../transport/udp/UdpTransportManager.java | 54 ++++- .../udp/UdpTransportIntegrationTest.java | 5 - pom.xml | 1 + 28 files changed, 931 insertions(+), 907 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/pom.xml ---------------------------------------------------------------------- diff --git a/gossip-base/pom.xml b/gossip-base/pom.xml index b9739f6..34f346c 100644 --- a/gossip-base/pom.xml +++ b/gossip-base/pom.xml @@ -75,4 +75,4 @@ - \ No newline at end of file + http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java index 2ceb453..792af85 100644 --- a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java +++ b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java @@ -44,7 +44,7 @@ public class GossipSettings { private String distribution = "normal"; private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper"; - + private String transportManagerClass = "org.apache.gossip.transport.udp.UdpTransportManager"; private String protocolManagerClass = "org.apache.gossip.protocol.json.JacksonProtocolManager"; @@ -241,4 +241,5 @@ public class GossipSettings { public void setProtocolManagerClass(String protocolManagerClass) { this.protocolManagerClass = protocolManagerClass; } + } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java index b1752cd..133a79f 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -50,7 +50,9 @@ public abstract class GossipManager { public static final Logger LOGGER = Logger.getLogger(GossipManager.class); // this mapper is used for ring and user-data persistence only. NOT messages. - public static final ObjectMapper metdataObjectMapper = new ObjectMapper() {{ + public static final ObjectMapper metdataObjectMapper = new ObjectMapper() { + private static final long serialVersionUID = 1L; + { enableDefaultTyping(); configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false); }}; http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java deleted file mode 100644 index 03a874c..0000000 --- a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.manager; - -import java.io.IOException; - -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.gossip.model.Base; -import org.apache.log4j.Logger; - - -/** - * This class handles the passive cycle, - * where this client has received an incoming message. - */ -public class PassiveGossipThread implements Runnable { - - public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class); - - - private final AtomicBoolean keepRunning; - private final GossipCore gossipCore; - private final GossipManager gossipManager; - - public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { - this.gossipManager = gossipManager; - this.gossipCore = gossipCore; - if (gossipManager.getMyself().getClusterName() == null){ - throw new IllegalArgumentException("Cluster was null"); - } - - keepRunning = new AtomicBoolean(true); - } - - @Override - public void run() { - while (keepRunning.get()) { - try { - byte[] buf = gossipManager.getTransportManager().read(); - try { - Base message = gossipManager.getProtocolManager().read(buf); - gossipCore.receive(message); - gossipManager.getMemberStateRefresher().run(); - } catch (RuntimeException ex) {//TODO trap json exception - LOGGER.error("Unable to process message", ex); - } - } catch (IOException e) { - // InterruptedException are completely normal here because of the blocking lifecycle. - if (!(e.getCause() instanceof InterruptedException)) { - LOGGER.error(e); - } - keepRunning.set(false); - } - } - } - - public void requestStop() { - keepRunning.set(false); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java b/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java index 0af9f12..5334ad4 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java @@ -22,14 +22,11 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.NavigableSet; -import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.gossip.LocalMember; -import org.apache.gossip.crdt.CrdtModule; import org.apache.log4j.Logger; public class RingStatePersister implements Runnable { http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java b/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java index 33db038..82b0dfb 100644 --- a/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java +++ b/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java @@ -21,7 +21,6 @@ import com.codahale.metrics.MetricRegistry; import org.apache.gossip.manager.AbstractActiveGossiper; import org.apache.gossip.manager.GossipCore; import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.manager.PassiveGossipThread; import org.apache.gossip.utils.ReflectionUtils; import org.apache.log4j.Logger; @@ -36,14 +35,14 @@ public abstract class AbstractTransportManager implements TransportManager { public static final Logger LOGGER = Logger.getLogger(AbstractTransportManager.class); - private final PassiveGossipThread passiveGossipThread; private final ExecutorService gossipThreadExecutor; - private final AbstractActiveGossiper activeGossipThread; + protected final GossipManager gossipManager; + protected final GossipCore gossipCore; public AbstractTransportManager(GossipManager gossipManager, GossipCore gossipCore) { - - passiveGossipThread = new PassiveGossipThread(gossipManager, gossipCore); + this.gossipManager = gossipManager; + this.gossipCore = gossipCore; gossipThreadExecutor = Executors.newCachedThreadPool(); activeGossipThread = ReflectionUtils.constructWithReflection( gossipManager.getSettings().getActiveGossipClass(), @@ -58,7 +57,6 @@ public abstract class AbstractTransportManager implements TransportManager { // shut down threads etc. @Override public void shutdown() { - passiveGossipThread.requestStop(); gossipThreadExecutor.shutdown(); if (activeGossipThread != null) { activeGossipThread.shutdown(); @@ -77,11 +75,9 @@ public abstract class AbstractTransportManager implements TransportManager { @Override public void startActiveGossiper() { - activeGossipThread.init(); + activeGossipThread.init(); } @Override - public void startEndpoint() { - gossipThreadExecutor.execute(passiveGossipThread); - } + public abstract void startEndpoint(); } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/test/java/org/apache/gossip/DataTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/DataTest.java b/gossip-base/src/test/java/org/apache/gossip/DataTest.java deleted file mode 100644 index bb33dc2..0000000 --- a/gossip-base/src/test/java/org/apache/gossip/DataTest.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip; - -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import org.apache.gossip.crdt.GrowOnlyCounter; -import org.apache.gossip.crdt.GrowOnlySet; -import org.apache.gossip.crdt.OrSet; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.manager.GossipManagerBuilder; -import org.apache.gossip.model.PerNodeDataMessage; -import org.apache.gossip.model.SharedDataMessage; -import org.junit.Test; - -import io.teknek.tunit.TUnit; - -public class DataTest extends AbstractIntegrationBase { - - private String orSetKey = "cror"; - private String gCounterKey = "crdtgc"; - - @Test - public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{ - GossipSettings settings = new GossipSettings(); - settings.setPersistRingState(false); - settings.setPersistDataState(false); - settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); - settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); - String cluster = UUID.randomUUID().toString(); - int seedNodes = 1; - List startupMembers = new ArrayList<>(); - for (int i = 1; i < seedNodes+1; ++i) { - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); - startupMembers.add(new RemoteMember(cluster, uri, i + "")); - } - final List clients = new ArrayList<>(); - final int clusterMembers = 2; - for (int i = 1; i < clusterMembers + 1; ++i) { - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); - GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri) - .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build(); - clients.add(gossipService); - gossipService.init(); - register(gossipService); - } - TUnit.assertThat(() -> { - int total = 0; - for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).getLiveMembers().size(); - } - return total; - }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); - clients.get(0).gossipPerNodeData(msg()); - clients.get(0).gossipSharedData(sharedMsg()); - - TUnit.assertThat(()-> { - PerNodeDataMessage x = clients.get(1).findPerNodeGossipData(1 + "", "a"); - if (x == null) - return ""; - else - return x.getPayload(); - }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b"); - - TUnit.assertThat(() -> { - SharedDataMessage x = clients.get(1).findSharedGossipData("a"); - if (x == null) - return ""; - else - return x.getPayload(); - }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c"); - - - givenDifferentDatumsInSet(clients); - assertThatListIsMerged(clients); - - givenOrs(clients); - assertThatOrSetIsMerged(clients); - dropIt(clients); - assertThatOrSetDelIsMerged(clients); - - - // test g counter - givenDifferentIncrement(clients); - assertThatCountIsUpdated(clients, 3); - givenIncreaseOther(clients); - assertThatCountIsUpdated(clients, 7); - - for (int i = 0; i < clusterMembers; ++i) { - clients.get(i).shutdown(); - } - } - - private void givenDifferentIncrement(final List clients) { - { - SharedDataMessage d = new SharedDataMessage(); - d.setKey(gCounterKey); - d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L))); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - clients.get(0).merge(d); - } - { - SharedDataMessage d = new SharedDataMessage(); - d.setKey(gCounterKey); - d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(1)).increment(2L))); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - clients.get(1).merge(d); - } - } - - private void givenIncreaseOther(final List clients) { - GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(1).findCrdt(gCounterKey); - GrowOnlyCounter gc2 = new GrowOnlyCounter(gc, - new GrowOnlyCounter.Builder(clients.get(1)).increment(4L)); - - SharedDataMessage d = new SharedDataMessage(); - d.setKey(gCounterKey); - d.setPayload(gc2); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - clients.get(1).merge(d); - } - - private void givenOrs(List clients) { - { - SharedDataMessage d = new SharedDataMessage(); - d.setKey(orSetKey); - d.setPayload(new OrSet("1", "2")); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - clients.get(0).merge(d); - } - { - SharedDataMessage d = new SharedDataMessage(); - d.setKey(orSetKey); - d.setPayload(new OrSet("3", "4")); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - clients.get(1).merge(d); - } - } - - private void dropIt(List clients) { - @SuppressWarnings("unchecked") - OrSet o = (OrSet) clients.get(0).findCrdt(orSetKey); - OrSet o2 = new OrSet(o, new OrSet.Builder().remove("3")); - SharedDataMessage d = new SharedDataMessage(); - d.setKey(orSetKey); - d.setPayload(o2); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - clients.get(0).merge(d); - } - - private void assertThatOrSetIsMerged(final List clients){ - TUnit.assertThat(() -> { - return clients.get(0).findCrdt(orSetKey).value(); - }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new OrSet("1", "2", "3", "4").value()); - TUnit.assertThat(() -> { - return clients.get(1).findCrdt(orSetKey).value(); - }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new OrSet("1", "2", "3", "4").value()); - } - - private void assertThatOrSetDelIsMerged(final List clients){ - TUnit.assertThat(() -> { - return clients.get(0).findCrdt(orSetKey); - }).afterWaitingAtMost(10, TimeUnit.SECONDS).equals(new OrSet("1", "2", "4")); - } - - private void givenDifferentDatumsInSet(final List clients){ - clients.get(0).merge(CrdtMessage("1")); - clients.get(1).merge(CrdtMessage("2")); - } - - - private void assertThatCountIsUpdated(final List clients, long finalCount) { - TUnit.assertThat(() -> { - return clients.get(0).findCrdt(gCounterKey); - }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlyCounter( - new GrowOnlyCounter.Builder(clients.get(0)).increment(finalCount))); - } - - private void assertThatListIsMerged(final List clients){ - TUnit.assertThat(() -> { - return clients.get(0).findCrdt("cr"); - }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlySet(Arrays.asList("1","2"))); - } - - private SharedDataMessage CrdtMessage(String item){ - SharedDataMessage d = new SharedDataMessage(); - d.setKey("cr"); - d.setPayload(new GrowOnlySet( Arrays.asList(item))); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - return d; - } - - private PerNodeDataMessage msg(){ - PerNodeDataMessage g = new PerNodeDataMessage(); - g.setExpireAt(Long.MAX_VALUE); - g.setKey("a"); - g.setPayload("b"); - g.setTimestamp(System.currentTimeMillis()); - return g; - } - - private SharedDataMessage sharedMsg(){ - SharedDataMessage g = new SharedDataMessage(); - g.setExpireAt(Long.MAX_VALUE); - g.setKey("a"); - g.setPayload("c"); - g.setTimestamp(System.currentTimeMillis()); - return g; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java b/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java deleted file mode 100644 index 1b6a32a..0000000 --- a/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip; - -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.manager.GossipManagerBuilder; -import org.junit.jupiter.api.Test; -import org.junit.platform.runner.JUnitPlatform; -import org.junit.runner.RunWith; - -import io.teknek.tunit.TUnit; - -@RunWith(JUnitPlatform.class) -public class IdAndPropertyTest extends AbstractIntegrationBase { - - @Test - public void testDatacenterRackGossiper() throws URISyntaxException, UnknownHostException, InterruptedException { - GossipSettings settings = new GossipSettings(); - settings.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName()); - settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); - settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); - List startupMembers = new ArrayList<>(); - Map x = new HashMap<>(); - x.put("a", "b"); - x.put("datacenter", "dc1"); - x.put("rack", "rack1"); - GossipManager gossipService1 = GossipManagerBuilder.newBuilder() - .cluster("a") - .uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0))) - .id("0") - .properties(x) - .gossipMembers(startupMembers) - .gossipSettings(settings).build(); - gossipService1.init(); - register(gossipService1); - - Map y = new HashMap<>(); - y.put("a", "c"); - y.put("datacenter", "dc2"); - y.put("rack", "rack2"); - GossipManager gossipService2 = GossipManagerBuilder.newBuilder().cluster("a") - .uri( new URI("udp://" + "127.0.0.1" + ":" + (29000 + 10))) - .id("1") - .properties(y) - .gossipMembers(Arrays.asList(new RemoteMember("a", - new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0"))) - .gossipSettings(settings).build(); - gossipService2.init(); - register(gossipService2); - - TUnit.assertThat(() -> { - String value = ""; - try { - value = gossipService1.getLiveMembers().get(0).getProperties().get("a"); - } catch (RuntimeException e){ } - return value; - }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("c"); - - TUnit.assertThat(() -> { - String value = ""; - try { - value = gossipService2.getLiveMembers().get(0).getProperties().get("a"); - } catch (RuntimeException e){ } - return value; - }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("b"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java deleted file mode 100644 index 30c52bc..0000000 --- a/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip; - -import io.teknek.tunit.TUnit; - -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; - -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.manager.GossipManagerBuilder; -import org.apache.log4j.Logger; - -import org.junit.platform.runner.JUnitPlatform; -import org.junit.jupiter.api.Test; - -import org.junit.runner.RunWith; - -@RunWith(JUnitPlatform.class) -public class ShutdownDeadtimeTest { - - private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class); - - // Note: this test is floppy depending on the values in GossipSettings (smaller values seem to do harm), and the - // sleep that happens after startup. - @Test - public void DeadNodesDoNotComeAliveAgain() - throws InterruptedException, UnknownHostException, URISyntaxException { - GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal"); - settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); - settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); - settings.setPersistRingState(false); - settings.setPersistDataState(false); - String cluster = UUID.randomUUID().toString(); - int seedNodes = 3; - List startupMembers = new ArrayList<>(); - for (int i = 1; i < seedNodes + 1; ++i) { - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30300 + i)); - startupMembers.add(new RemoteMember(cluster, uri, i + "")); - } - final List clients = Collections.synchronizedList(new ArrayList()); - final int clusterMembers = 5; - for (int i = 1; i < clusterMembers + 1; ++i) { - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30300 + i)); - GossipManager gossipService = GossipManagerBuilder.newBuilder() - .cluster(cluster) - .uri(uri) - .id(i + "") - .gossipMembers(startupMembers) - .gossipSettings(settings) - .build(); - clients.add(gossipService); - gossipService.init(); - Thread.sleep(1000); - } - TUnit.assertThat(new Callable() { - public Integer call() throws Exception { - int total = 0; - for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).getLiveMembers().size(); - } - return total; - } - }).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20); - - // shutdown one client and verify that one client is lost. - Random r = new Random(); - int randomClientId = r.nextInt(clusterMembers); - log.info("shutting down " + randomClientId); - final int shutdownPort = clients.get(randomClientId).getMyself().getUri() - .getPort(); - final String shutdownId = clients.get(randomClientId).getMyself().getId(); - clients.get(randomClientId).shutdown(); - TUnit.assertThat(new Callable() { - public Integer call() throws Exception { - int total = 0; - for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).getLiveMembers().size(); - } - return total; - } - }).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(16); - clients.remove(randomClientId); - - TUnit.assertThat(new Callable() { - public Integer call() throws Exception { - int total = 0; - for (int i = 0; i < clusterMembers - 1; ++i) { - total += clients.get(i).getDeadMembers().size(); - } - return total; - } - }).afterWaitingAtMost(50, TimeUnit.SECONDS).isEqualTo(4); - - URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort); - // start client again - GossipManager gossipService = GossipManagerBuilder.newBuilder() - .gossipSettings(settings) - .cluster(cluster) - .uri(uri) - .id(shutdownId+"") - .gossipMembers(startupMembers) - .build(); - clients.add(gossipService); - gossipService.init(); - - // verify that the client is alive again for every node - TUnit.assertThat(new Callable() { - public Integer call() throws Exception { - int total = 0; - for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).getLiveMembers().size(); - } - return total; - } - }).afterWaitingAtMost(60, TimeUnit.SECONDS).isEqualTo(20); - - for (int i = 0; i < clusterMembers; ++i) { - final int j = i; - new Thread() { - public void run(){ - clients.get(j).shutdown(); - } - }.start(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java b/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java deleted file mode 100644 index f669a23..0000000 --- a/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip; - -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.security.NoSuchAlgorithmException; -import java.security.NoSuchProviderException; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.manager.GossipManagerBuilder; -import org.apache.gossip.manager.PassiveGossipConstants; -import org.apache.gossip.secure.KeyTool; -import org.junit.Assert; -import org.junit.Test; - -import io.teknek.tunit.TUnit; - -public class SignedMessageTest extends AbstractIntegrationBase { - - private GossipSettings gossiperThatSigns(){ - GossipSettings settings = new GossipSettings(); - settings.setPersistRingState(false); - settings.setPersistDataState(false); - settings.setSignMessages(true); - settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); - settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); - return settings; - } - - private GossipSettings gossiperThatSigns(String keysDir){ - GossipSettings settings = gossiperThatSigns(); - settings.setPathToKeyStore(Objects.requireNonNull(keysDir)); - return settings; - } - - @Test - public void dataTest() throws InterruptedException, URISyntaxException, NoSuchAlgorithmException, NoSuchProviderException, IOException { - final String keys = System.getProperty("java.io.tmpdir") + "/keys"; - GossipSettings settings = gossiperThatSigns(keys); - setup(keys); - String cluster = UUID.randomUUID().toString(); - List startupMembers = new ArrayList<>(); - for (int i = 1; i < 2; ++i) { - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + i)); - startupMembers.add(new RemoteMember(cluster, uri, i + "")); - } - final List clients = new ArrayList<>(); - for (int i = 1; i < 3; ++i) { - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + i)); - GossipManager gossipService = GossipManagerBuilder.newBuilder() - .cluster(cluster) - .uri(uri) - .id(i + "") - .gossipMembers(startupMembers) - .gossipSettings(settings) - .build(); - gossipService.init(); - clients.add(gossipService); - } - assertTwoAlive(clients); - assertOnlySignedMessages(clients); - cleanup(keys, clients); - } - - private void assertTwoAlive(List clients){ - TUnit.assertThat(() -> { - int total = 0; - for (int i = 0; i < clients.size(); ++i) { - total += clients.get(i).getLiveMembers().size(); - } - return total; - }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); - } - - private void assertOnlySignedMessages(List clients){ - Assert.assertEquals(0, clients.get(0).getRegistry() - .meter(PassiveGossipConstants.UNSIGNED_MESSAGE).getCount()); - Assert.assertTrue(clients.get(0).getRegistry() - .meter(PassiveGossipConstants.SIGNED_MESSAGE).getCount() > 0); - } - - private void cleanup(String keys, List clients){ - new File(keys, "1").delete(); - new File(keys, "2").delete(); - new File(keys).delete(); - for (int i = 0; i < clients.size(); ++i) { - clients.get(i).shutdown(); - } - } - - private void setup(String keys) throws NoSuchAlgorithmException, NoSuchProviderException, IOException { - new File(keys).mkdir(); - KeyTool.generatePubandPrivateKeyFiles(keys, "1"); - KeyTool.generatePubandPrivateKeyFiles(keys, "2"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java b/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java deleted file mode 100644 index ea93a90..0000000 --- a/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip; - -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.manager.GossipManagerBuilder; -import org.apache.log4j.Logger; - -import org.junit.jupiter.api.Test; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.UUID; -import org.junit.platform.runner.JUnitPlatform; -import org.junit.runner.RunWith; - -/** - * Tests support of using {@code StartupSettings} and thereby reading - * setup config from file. - */ -@RunWith(JUnitPlatform.class) -public class StartupSettingsTest { - private static final Logger log = Logger.getLogger(StartupSettingsTest.class); - private static final String CLUSTER = UUID.randomUUID().toString(); - - @Test - public void testUsingSettingsFile() throws IOException, InterruptedException, URISyntaxException { - File settingsFile = File.createTempFile("gossipTest",".json"); - settingsFile.deleteOnExit(); - writeSettingsFile(settingsFile); - URI uri = new URI("udp://" + "127.0.0.1" + ":" + 50000); - GossipSettings firstGossipSettings = new GossipSettings(); - firstGossipSettings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); - firstGossipSettings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); - GossipManager firstService = GossipManagerBuilder.newBuilder() - .cluster(CLUSTER) - .uri(uri) - .id("1") - .gossipSettings(firstGossipSettings).build(); - firstService.init(); - GossipManager manager = GossipManagerBuilder.newBuilder() - .startupSettings(StartupSettings.fromJSONFile(settingsFile)).build(); - manager.init(); - firstService.shutdown(); - manager.shutdown(); - } - - private void writeSettingsFile( File target ) throws IOException { - String settings = - "[{\n" + // It is odd that this is meant to be in an array, but oh well. - " \"cluster\":\"" + CLUSTER + "\",\n" + - " \"id\":\"" + "2" + "\",\n" + - " \"uri\":\"udp://127.0.0.1:50001\",\n" + - " \"gossip_interval\":1000,\n" + - " \"window_size\":1000,\n" + - " \"minimum_samples\":5,\n" + - " \"cleanup_interval\":10000,\n" + - " \"convict_threshold\":2.6,\n" + - " \"distribution\":\"exponential\",\n" + - " \"transport_manager_class\":\"org.apache.gossip.transport.UnitTestTransportManager\",\n" + - " \"protocol_manager_class\":\"org.apache.gossip.protocol.UnitTestProtocolManager\",\n" + - " \"properties\":{},\n" + - " \"members\":[\n" + - " {\"cluster\": \"" + CLUSTER + "\",\"uri\":\"udp://127.0.0.1:5000\"}\n" + - " ]\n" + - "}]"; - - log.info( "Using settings file with contents of:\n---\n" + settings + "\n---" ); - FileOutputStream output = new FileOutputStream(target); - output.write(settings.getBytes()); - output.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java deleted file mode 100644 index c6d7d46..0000000 --- a/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip; - -import io.teknek.tunit.TUnit; - -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; -import org.junit.platform.runner.JUnitPlatform; -import org.junit.runner.RunWith; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.manager.GossipManagerBuilder; -import org.junit.jupiter.api.Test; - -@RunWith(JUnitPlatform.class) -public class TenNodeThreeSeedTest { - - @Test - public void test() throws UnknownHostException, InterruptedException, URISyntaxException { - abc(30150); - } - - @Test - public void testAgain() throws UnknownHostException, InterruptedException, URISyntaxException { - abc(30100); - } - - public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException { - GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential"); - settings.setPersistRingState(false); - settings.setPersistDataState(false); - settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); - settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); - String cluster = UUID.randomUUID().toString(); - int seedNodes = 3; - List startupMembers = new ArrayList<>(); - for (int i = 1; i < seedNodes+1; ++i) { - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i)); - startupMembers.add(new RemoteMember(cluster, uri, i + "")); - } - final List clients = new ArrayList<>(); - final int clusterMembers = 5; - for (int i = 1; i < clusterMembers+1; ++i) { - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i)); - GossipManager gossipService = GossipManagerBuilder.newBuilder() - .cluster(cluster) - .uri(uri) - .id(i + "") - .gossipSettings(settings) - .gossipMembers(startupMembers) - .build(); - gossipService.init(); - clients.add(gossipService); - } - TUnit.assertThat(new Callable (){ - public Integer call() throws Exception { - int total = 0; - for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).getLiveMembers().size(); - } - return total; - }}).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20); - - for (int i = 0; i < clusterMembers; ++i) { - int j = i; - new Thread(){ - public void run(){ - clients.get(j).shutdown(); - } - }.start(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java index ec91d67..42b9353 100644 --- a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java @@ -102,7 +102,7 @@ public class MessageHandlerTest { @Test(expected = NullPointerException.class) public void cantAddNullHandler2() { - MessageHandler handler = MessageHandlerFactory.concurrentHandler( + MessageHandlerFactory.concurrentHandler( new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler()), null, new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler()) http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java ---------------------------------------------------------------------- diff --git a/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java b/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java index a783b75..206bc62 100644 --- a/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java +++ b/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java @@ -29,7 +29,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; /** Only use in unit tests! */ -public class UnitTestTransportManager extends AbstractTransportManager { +public class UnitTestTransportManager extends AbstractTransportManager { private static final Map allManagers = new ConcurrentHashMap<>(); @@ -71,6 +71,5 @@ public class UnitTestTransportManager extends AbstractTransportManager { @Override public void startEndpoint() { allManagers.put(localEndpoint, this); - super.startEndpoint(); } } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-itest/pom.xml ---------------------------------------------------------------------- diff --git a/gossip-itest/pom.xml b/gossip-itest/pom.xml new file mode 100644 index 0000000..6067732 --- /dev/null +++ b/gossip-itest/pom.xml @@ -0,0 +1,88 @@ + + + + 4.0.0 + + + org.apache.gossip + gossip-parent + 0.1.3-incubating-SNAPSHOT + ../pom.xml + + + Gossip itest + gossip-itest + 0.1.3-incubating-SNAPSHOT + + + + org.apache.gossip + gossip-base + ${project.version} + + + org.apache.gossip + gossip-base + ${project.version} + test-jar + test + + + org.apache.gossip + gossip-protocol-jackson + ${project.version} + + + org.apache.gossip + gossip-transport-udp + ${project.version} + + + + + + + maven-compiler-plugin + 3.1 + + ${java.version} + ${java.version} + + + + maven-surefire-plugin + 2.19.1 + + + ${project.build.directory} + + + + + org.junit.platform + junit-platform-surefire-provider + ${junit.platform.version} + + + + + + http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-itest/src/test/java/org/apache/gossip/DataTest.java ---------------------------------------------------------------------- diff --git a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java new file mode 100644 index 0000000..9fe9aa9 --- /dev/null +++ b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.gossip.crdt.GrowOnlyCounter; +import org.apache.gossip.crdt.GrowOnlySet; +import org.apache.gossip.crdt.OrSet; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.SharedDataMessage; +import org.junit.Test; + +import io.teknek.tunit.TUnit; + +public class DataTest extends AbstractIntegrationBase { + + private String orSetKey = "cror"; + private String gCounterKey = "crdtgc"; + + @Test + public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{ + GossipSettings settings = new GossipSettings(); + settings.setPersistRingState(false); + settings.setPersistDataState(false); + String cluster = UUID.randomUUID().toString(); + int seedNodes = 1; + List startupMembers = new ArrayList<>(); + for (int i = 1; i < seedNodes+1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + startupMembers.add(new RemoteMember(cluster, uri, i + "")); + } + final List clients = new ArrayList<>(); + final int clusterMembers = 2; + for (int i = 1; i < clusterMembers + 1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri) + .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build(); + clients.add(gossipService); + gossipService.init(); + register(gossipService); + } + TUnit.assertThat(() -> { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).getLiveMembers().size(); + } + return total; + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); + clients.get(0).gossipPerNodeData(msg()); + clients.get(0).gossipSharedData(sharedMsg()); + + TUnit.assertThat(()-> { + PerNodeDataMessage x = clients.get(1).findPerNodeGossipData(1 + "", "a"); + if (x == null) + return ""; + else + return x.getPayload(); + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b"); + + TUnit.assertThat(() -> { + SharedDataMessage x = clients.get(1).findSharedGossipData("a"); + if (x == null) + return ""; + else + return x.getPayload(); + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c"); + + + givenDifferentDatumsInSet(clients); + assertThatListIsMerged(clients); + + givenOrs(clients); + assertThatOrSetIsMerged(clients); + dropIt(clients); + assertThatOrSetDelIsMerged(clients); + + + // test g counter + givenDifferentIncrement(clients); + assertThatCountIsUpdated(clients, 3); + givenIncreaseOther(clients); + assertThatCountIsUpdated(clients, 7); + + for (int i = 0; i < clusterMembers; ++i) { + clients.get(i).shutdown(); + } + } + + private void givenDifferentIncrement(final List clients) { + { + SharedDataMessage d = new SharedDataMessage(); + d.setKey(gCounterKey); + d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L))); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + clients.get(0).merge(d); + } + { + SharedDataMessage d = new SharedDataMessage(); + d.setKey(gCounterKey); + d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(1)).increment(2L))); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + clients.get(1).merge(d); + } + } + + private void givenIncreaseOther(final List clients) { + GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(1).findCrdt(gCounterKey); + GrowOnlyCounter gc2 = new GrowOnlyCounter(gc, + new GrowOnlyCounter.Builder(clients.get(1)).increment(4L)); + + SharedDataMessage d = new SharedDataMessage(); + d.setKey(gCounterKey); + d.setPayload(gc2); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + clients.get(1).merge(d); + } + + private void givenOrs(List clients) { + { + SharedDataMessage d = new SharedDataMessage(); + d.setKey(orSetKey); + d.setPayload(new OrSet("1", "2")); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + clients.get(0).merge(d); + } + { + SharedDataMessage d = new SharedDataMessage(); + d.setKey(orSetKey); + d.setPayload(new OrSet("3", "4")); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + clients.get(1).merge(d); + } + } + + private void dropIt(List clients) { + @SuppressWarnings("unchecked") + OrSet o = (OrSet) clients.get(0).findCrdt(orSetKey); + OrSet o2 = new OrSet(o, new OrSet.Builder().remove("3")); + SharedDataMessage d = new SharedDataMessage(); + d.setKey(orSetKey); + d.setPayload(o2); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + clients.get(0).merge(d); + } + + private void assertThatOrSetIsMerged(final List clients){ + TUnit.assertThat(() -> { + return clients.get(0).findCrdt(orSetKey).value(); + }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new OrSet("1", "2", "3", "4").value()); + TUnit.assertThat(() -> { + return clients.get(1).findCrdt(orSetKey).value(); + }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new OrSet("1", "2", "3", "4").value()); + } + + private void assertThatOrSetDelIsMerged(final List clients){ + TUnit.assertThat(() -> { + return clients.get(0).findCrdt(orSetKey); + }).afterWaitingAtMost(10, TimeUnit.SECONDS).equals(new OrSet("1", "2", "4")); + } + + private void givenDifferentDatumsInSet(final List clients){ + clients.get(0).merge(CrdtMessage("1")); + clients.get(1).merge(CrdtMessage("2")); + } + + + private void assertThatCountIsUpdated(final List clients, long finalCount) { + TUnit.assertThat(() -> { + return clients.get(0).findCrdt(gCounterKey); + }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlyCounter( + new GrowOnlyCounter.Builder(clients.get(0)).increment(finalCount))); + } + + private void assertThatListIsMerged(final List clients){ + TUnit.assertThat(() -> { + return clients.get(0).findCrdt("cr"); + }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlySet(Arrays.asList("1","2"))); + } + + private SharedDataMessage CrdtMessage(String item){ + SharedDataMessage d = new SharedDataMessage(); + d.setKey("cr"); + d.setPayload(new GrowOnlySet( Arrays.asList(item))); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + return d; + } + + private PerNodeDataMessage msg(){ + PerNodeDataMessage g = new PerNodeDataMessage(); + g.setExpireAt(Long.MAX_VALUE); + g.setKey("a"); + g.setPayload("b"); + g.setTimestamp(System.currentTimeMillis()); + return g; + } + + private SharedDataMessage sharedMsg(){ + SharedDataMessage g = new SharedDataMessage(); + g.setExpireAt(Long.MAX_VALUE); + g.setKey("a"); + g.setPayload("c"); + g.setTimestamp(System.currentTimeMillis()); + return g; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-itest/src/test/java/org/apache/gossip/IdAndPropertyTest.java ---------------------------------------------------------------------- diff --git a/gossip-itest/src/test/java/org/apache/gossip/IdAndPropertyTest.java b/gossip-itest/src/test/java/org/apache/gossip/IdAndPropertyTest.java new file mode 100644 index 0000000..7f550de --- /dev/null +++ b/gossip-itest/src/test/java/org/apache/gossip/IdAndPropertyTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; +import org.junit.jupiter.api.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; + +import io.teknek.tunit.TUnit; + +@RunWith(JUnitPlatform.class) +public class IdAndPropertyTest extends AbstractIntegrationBase { + + @Test + public void testDatacenterRackGossiper() throws URISyntaxException, UnknownHostException, InterruptedException { + GossipSettings settings = new GossipSettings(); + settings.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName()); + List startupMembers = new ArrayList<>(); + Map x = new HashMap<>(); + x.put("a", "b"); + x.put("datacenter", "dc1"); + x.put("rack", "rack1"); + GossipManager gossipService1 = GossipManagerBuilder.newBuilder() + .cluster("a") + .uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0))) + .id("0") + .properties(x) + .gossipMembers(startupMembers) + .gossipSettings(settings).build(); + gossipService1.init(); + register(gossipService1); + + Map y = new HashMap<>(); + y.put("a", "c"); + y.put("datacenter", "dc2"); + y.put("rack", "rack2"); + GossipManager gossipService2 = GossipManagerBuilder.newBuilder().cluster("a") + .uri( new URI("udp://" + "127.0.0.1" + ":" + (29000 + 10))) + .id("1") + .properties(y) + .gossipMembers(Arrays.asList(new RemoteMember("a", + new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0"))) + .gossipSettings(settings).build(); + gossipService2.init(); + register(gossipService2); + + TUnit.assertThat(() -> { + String value = ""; + try { + value = gossipService1.getLiveMembers().get(0).getProperties().get("a"); + } catch (RuntimeException e){ } + return value; + }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("c"); + + TUnit.assertThat(() -> { + String value = ""; + try { + value = gossipService2.getLiveMembers().get(0).getProperties().get("a"); + } catch (RuntimeException e){ } + return value; + }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("b"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java ---------------------------------------------------------------------- diff --git a/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java new file mode 100644 index 0000000..dd5bfe9 --- /dev/null +++ b/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import io.teknek.tunit.TUnit; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; +import org.apache.log4j.Logger; + +import org.junit.platform.runner.JUnitPlatform; +import org.junit.jupiter.api.Test; + +import org.junit.runner.RunWith; + +@RunWith(JUnitPlatform.class) +public class ShutdownDeadtimeTest { + + private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class); + + // Note: this test is floppy depending on the values in GossipSettings (smaller values seem to do harm), and the + // sleep that happens after startup. + @Test + public void DeadNodesDoNotComeAliveAgain() + throws InterruptedException, UnknownHostException, URISyntaxException { + GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal"); + settings.setPersistRingState(false); + settings.setPersistDataState(false); + String cluster = UUID.randomUUID().toString(); + int seedNodes = 3; + List startupMembers = new ArrayList<>(); + for (int i = 1; i < seedNodes + 1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30300 + i)); + startupMembers.add(new RemoteMember(cluster, uri, i + "")); + } + final List clients = Collections.synchronizedList(new ArrayList()); + final int clusterMembers = 5; + for (int i = 1; i < clusterMembers + 1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30300 + i)); + GossipManager gossipService = GossipManagerBuilder.newBuilder() + .cluster(cluster) + .uri(uri) + .id(i + "") + .gossipMembers(startupMembers) + .gossipSettings(settings) + .build(); + clients.add(gossipService); + gossipService.init(); + Thread.sleep(1000); + } + TUnit.assertThat(new Callable() { + public Integer call() throws Exception { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).getLiveMembers().size(); + } + return total; + } + }).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20); + + // shutdown one client and verify that one client is lost. + Random r = new Random(); + int randomClientId = r.nextInt(clusterMembers); + log.info("shutting down " + randomClientId); + final int shutdownPort = clients.get(randomClientId).getMyself().getUri() + .getPort(); + final String shutdownId = clients.get(randomClientId).getMyself().getId(); + clients.get(randomClientId).shutdown(); + TUnit.assertThat(new Callable() { + public Integer call() throws Exception { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).getLiveMembers().size(); + } + return total; + } + }).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(16); + clients.remove(randomClientId); + + TUnit.assertThat(new Callable() { + public Integer call() throws Exception { + int total = 0; + for (int i = 0; i < clusterMembers - 1; ++i) { + total += clients.get(i).getDeadMembers().size(); + } + return total; + } + }).afterWaitingAtMost(50, TimeUnit.SECONDS).isEqualTo(4); + + URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort); + // start client again + GossipManager gossipService = GossipManagerBuilder.newBuilder() + .gossipSettings(settings) + .cluster(cluster) + .uri(uri) + .id(shutdownId+"") + .gossipMembers(startupMembers) + .build(); + clients.add(gossipService); + gossipService.init(); + + // verify that the client is alive again for every node + TUnit.assertThat(new Callable() { + public Integer call() throws Exception { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).getLiveMembers().size(); + } + return total; + } + }).afterWaitingAtMost(60, TimeUnit.SECONDS).isEqualTo(20); + + for (int i = 0; i < clusterMembers; ++i) { + final int j = i; + new Thread() { + public void run(){ + clients.get(j).shutdown(); + } + }.start(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-itest/src/test/java/org/apache/gossip/SignedMessageTest.java ---------------------------------------------------------------------- diff --git a/gossip-itest/src/test/java/org/apache/gossip/SignedMessageTest.java b/gossip-itest/src/test/java/org/apache/gossip/SignedMessageTest.java new file mode 100644 index 0000000..e288cb8 --- /dev/null +++ b/gossip-itest/src/test/java/org/apache/gossip/SignedMessageTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.security.NoSuchAlgorithmException; +import java.security.NoSuchProviderException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; +import org.apache.gossip.manager.PassiveGossipConstants; +import org.apache.gossip.secure.KeyTool; +import org.junit.Assert; +import org.junit.Test; + +import io.teknek.tunit.TUnit; + +public class SignedMessageTest extends AbstractIntegrationBase { + + private GossipSettings gossiperThatSigns(){ + GossipSettings settings = new GossipSettings(); + settings.setPersistRingState(false); + settings.setPersistDataState(false); + settings.setSignMessages(true); + return settings; + } + + private GossipSettings gossiperThatSigns(String keysDir){ + GossipSettings settings = gossiperThatSigns(); + settings.setPathToKeyStore(Objects.requireNonNull(keysDir)); + return settings; + } + + @Test + public void dataTest() throws InterruptedException, URISyntaxException, NoSuchAlgorithmException, NoSuchProviderException, IOException { + final String keys = System.getProperty("java.io.tmpdir") + "/keys"; + GossipSettings settings = gossiperThatSigns(keys); + setup(keys); + String cluster = UUID.randomUUID().toString(); + List startupMembers = new ArrayList<>(); + for (int i = 1; i < 2; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + i)); + startupMembers.add(new RemoteMember(cluster, uri, i + "")); + } + final List clients = new ArrayList<>(); + for (int i = 1; i < 3; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + i)); + GossipManager gossipService = GossipManagerBuilder.newBuilder() + .cluster(cluster) + .uri(uri) + .id(i + "") + .gossipMembers(startupMembers) + .gossipSettings(settings) + .build(); + gossipService.init(); + clients.add(gossipService); + } + assertTwoAlive(clients); + assertOnlySignedMessages(clients); + cleanup(keys, clients); + } + + private void assertTwoAlive(List clients){ + TUnit.assertThat(() -> { + int total = 0; + for (int i = 0; i < clients.size(); ++i) { + total += clients.get(i).getLiveMembers().size(); + } + return total; + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); + } + + private void assertOnlySignedMessages(List clients){ + Assert.assertEquals(0, clients.get(0).getRegistry() + .meter(PassiveGossipConstants.UNSIGNED_MESSAGE).getCount()); + Assert.assertTrue(clients.get(0).getRegistry() + .meter(PassiveGossipConstants.SIGNED_MESSAGE).getCount() > 0); + } + + private void cleanup(String keys, List clients){ + new File(keys, "1").delete(); + new File(keys, "2").delete(); + new File(keys).delete(); + for (int i = 0; i < clients.size(); ++i) { + clients.get(i).shutdown(); + } + } + + private void setup(String keys) throws NoSuchAlgorithmException, NoSuchProviderException, IOException { + new File(keys).mkdir(); + KeyTool.generatePubandPrivateKeyFiles(keys, "1"); + KeyTool.generatePubandPrivateKeyFiles(keys, "2"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-itest/src/test/java/org/apache/gossip/StartupSettingsTest.java ---------------------------------------------------------------------- diff --git a/gossip-itest/src/test/java/org/apache/gossip/StartupSettingsTest.java b/gossip-itest/src/test/java/org/apache/gossip/StartupSettingsTest.java new file mode 100644 index 0000000..ea93a90 --- /dev/null +++ b/gossip-itest/src/test/java/org/apache/gossip/StartupSettingsTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; +import org.apache.log4j.Logger; + +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.UUID; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; + +/** + * Tests support of using {@code StartupSettings} and thereby reading + * setup config from file. + */ +@RunWith(JUnitPlatform.class) +public class StartupSettingsTest { + private static final Logger log = Logger.getLogger(StartupSettingsTest.class); + private static final String CLUSTER = UUID.randomUUID().toString(); + + @Test + public void testUsingSettingsFile() throws IOException, InterruptedException, URISyntaxException { + File settingsFile = File.createTempFile("gossipTest",".json"); + settingsFile.deleteOnExit(); + writeSettingsFile(settingsFile); + URI uri = new URI("udp://" + "127.0.0.1" + ":" + 50000); + GossipSettings firstGossipSettings = new GossipSettings(); + firstGossipSettings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); + firstGossipSettings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); + GossipManager firstService = GossipManagerBuilder.newBuilder() + .cluster(CLUSTER) + .uri(uri) + .id("1") + .gossipSettings(firstGossipSettings).build(); + firstService.init(); + GossipManager manager = GossipManagerBuilder.newBuilder() + .startupSettings(StartupSettings.fromJSONFile(settingsFile)).build(); + manager.init(); + firstService.shutdown(); + manager.shutdown(); + } + + private void writeSettingsFile( File target ) throws IOException { + String settings = + "[{\n" + // It is odd that this is meant to be in an array, but oh well. + " \"cluster\":\"" + CLUSTER + "\",\n" + + " \"id\":\"" + "2" + "\",\n" + + " \"uri\":\"udp://127.0.0.1:50001\",\n" + + " \"gossip_interval\":1000,\n" + + " \"window_size\":1000,\n" + + " \"minimum_samples\":5,\n" + + " \"cleanup_interval\":10000,\n" + + " \"convict_threshold\":2.6,\n" + + " \"distribution\":\"exponential\",\n" + + " \"transport_manager_class\":\"org.apache.gossip.transport.UnitTestTransportManager\",\n" + + " \"protocol_manager_class\":\"org.apache.gossip.protocol.UnitTestProtocolManager\",\n" + + " \"properties\":{},\n" + + " \"members\":[\n" + + " {\"cluster\": \"" + CLUSTER + "\",\"uri\":\"udp://127.0.0.1:5000\"}\n" + + " ]\n" + + "}]"; + + log.info( "Using settings file with contents of:\n---\n" + settings + "\n---" ); + FileOutputStream output = new FileOutputStream(target); + output.write(settings.getBytes()); + output.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java ---------------------------------------------------------------------- diff --git a/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java new file mode 100644 index 0000000..8ae783e --- /dev/null +++ b/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import io.teknek.tunit.TUnit; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; +import org.junit.jupiter.api.Test; + +@RunWith(JUnitPlatform.class) +public class TenNodeThreeSeedTest { + + @Test + public void test() throws UnknownHostException, InterruptedException, URISyntaxException { + abc(30150); + } + + @Test + public void testAgain() throws UnknownHostException, InterruptedException, URISyntaxException { + abc(30100); + } + + public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException { + GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential"); + settings.setPersistRingState(false); + settings.setPersistDataState(false); + String cluster = UUID.randomUUID().toString(); + int seedNodes = 3; + List startupMembers = new ArrayList<>(); + for (int i = 1; i < seedNodes+1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i)); + startupMembers.add(new RemoteMember(cluster, uri, i + "")); + } + final List clients = new ArrayList<>(); + final int clusterMembers = 5; + for (int i = 1; i < clusterMembers+1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i)); + GossipManager gossipService = GossipManagerBuilder.newBuilder() + .cluster(cluster) + .uri(uri) + .id(i + "") + .gossipSettings(settings) + .gossipMembers(startupMembers) + .build(); + gossipService.init(); + clients.add(gossipService); + } + TUnit.assertThat(new Callable (){ + public Integer call() throws Exception { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).getLiveMembers().size(); + } + return total; + }}).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20); + + for (int i = 0; i < clusterMembers; ++i) { + int j = i; + new Thread(){ + public void run(){ + clients.get(j).shutdown(); + } + }.start(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-protocol-jackson/pom.xml ---------------------------------------------------------------------- diff --git a/gossip-protocol-jackson/pom.xml b/gossip-protocol-jackson/pom.xml index 067a27e..128a26d 100644 --- a/gossip-protocol-jackson/pom.xml +++ b/gossip-protocol-jackson/pom.xml @@ -36,16 +36,16 @@ org.apache.gossip gossip-base - 0.1.3-incubating-SNAPSHOT + ${project.version} org.apache.gossip gossip-base - 0.1.3-incubating-SNAPSHOT + ${project.version} test-jar test - \ No newline at end of file + http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java ---------------------------------------------------------------------- diff --git a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java index bd8a949..cbac460 100644 --- a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java +++ b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java @@ -25,9 +25,7 @@ import org.apache.gossip.Member; import org.apache.gossip.crdt.OrSet; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.GossipManagerBuilder; -import org.apache.gossip.model.Base; import org.apache.gossip.protocol.ProtocolManager; -import org.apache.gossip.udp.Trackable; import org.junit.Assert; import org.junit.Test; @@ -36,11 +34,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Objects; import java.util.UUID; public class JacksonTest { http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java ---------------------------------------------------------------------- diff --git a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java index 43032de..7ac211d 100644 --- a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java +++ b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java @@ -41,6 +41,7 @@ class TestMessage extends Base implements Trackable { private Object[] arrayOfThings; private Map mapOfThings = new HashMap<>(); + @SuppressWarnings("unused")//Used by ObjectMapper private TestMessage() { } http://git-wip-us.apache.org/repos/asf/incubator-gossip/blob/c62ebaf9/gossip-transport-udp/pom.xml ---------------------------------------------------------------------- diff --git a/gossip-transport-udp/pom.xml b/gossip-transport-udp/pom.xml index 2e79b1a..446aace 100644 --- a/gossip-transport-udp/pom.xml +++ b/gossip-transport-udp/pom.xml @@ -36,8 +36,8 @@ org.apache.gossip gossip-base - 0.1.3-incubating-SNAPSHOT + ${project.version} - \ No newline at end of file +