Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 74F4618F4A for ; Fri, 9 Oct 2015 00:09:56 +0000 (UTC) Received: (qmail 90050 invoked by uid 500); 9 Oct 2015 00:09:56 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 89980 invoked by uid 500); 9 Oct 2015 00:09:56 -0000 Mailing-List: contact commits-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.apache.org Delivered-To: mailing list commits@curator.apache.org Received: (qmail 89875 invoked by uid 99); 9 Oct 2015 00:09:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Oct 2015 00:09:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AA1CFE0C6E; Fri, 9 Oct 2015 00:09:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: randgalt@apache.org To: commits@curator.apache.org Date: Fri, 09 Oct 2015 00:09:57 -0000 Message-Id: <4f6313ec9bae4e198317b5bf05fe7f98@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [03/10] curator git commit: wip - adding() API was misnamed. Also, it's mutually exclusive with join/leave wip - adding() API was misnamed. Also, it's mutually exclusive with join/leave Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d42ef172 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d42ef172 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d42ef172 Branch: refs/heads/CURATOR-3.0 Commit: d42ef172e57af17ed42d7c2c4e2d9a7a0c520f3c Parents: 4c3c837 Author: randgalt Authored: Fri Sep 25 21:07:44 2015 -0500 Committer: randgalt Committed: Fri Sep 25 21:07:44 2015 -0500 ---------------------------------------------------------------------- .../api/JoinStatConfigEnsembleable.java | 2 +- .../api/LeaveStatConfigEnsembleable.java | 2 +- .../curator/framework/api/Membersable.java | 43 +++ .../framework/api/ReconfigBuilderMain.java | 6 +- .../api/StatConfigureEnsembleable.java | 26 ++ .../framework/imps/ReconfigBuilderImpl.java | 370 +++---------------- .../framework/imps/TestReconfiguration.java | 205 +++++++--- .../framework/imps/TestReconfigurationX.java | 33 +- .../org/apache/curator/test/TestingCluster.java | 40 +- 9 files changed, 337 insertions(+), 390 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/d42ef172/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigEnsembleable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigEnsembleable.java index 7ab51e2..c20387c 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigEnsembleable.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/JoinStatConfigEnsembleable.java @@ -25,7 +25,7 @@ package org.apache.curator.framework.api; * mixing concepts that can't be used together. */ public interface JoinStatConfigEnsembleable extends - Joinable, + Joinable, ConfigureEnsembleable, Statable { http://git-wip-us.apache.org/repos/asf/curator/blob/d42ef172/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveStatConfigEnsembleable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveStatConfigEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveStatConfigEnsembleable.java index 1464d26..b80bd00 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveStatConfigEnsembleable.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/LeaveStatConfigEnsembleable.java @@ -25,7 +25,7 @@ package org.apache.curator.framework.api; * mixing concepts that can't be used together. */ public interface LeaveStatConfigEnsembleable extends - Leaveable>, + Leaveable, ConfigureEnsembleable, Statable { http://git-wip-us.apache.org/repos/asf/curator/blob/d42ef172/curator-framework/src/main/java/org/apache/curator/framework/api/Membersable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Membersable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Membersable.java new file mode 100644 index 0000000..e1f8d9e --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Membersable.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.api; + +import java.util.List; + +public interface Membersable +{ + /** + * Sets one or more members that are meant to be the ensemble. + * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port] + * + * @param server The server joining. + * @return this + */ + T withNewMembers(String... server); + + /** + * Sets one or more members that are meant to be the ensemble. + * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port] + * + * @param servers The servers joining. + * @return this + */ + T withNewMembers(List servers); +} http://git-wip-us.apache.org/repos/asf/curator/blob/d42ef172/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilderMain.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilderMain.java b/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilderMain.java index b86af2d..b9d1be3 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilderMain.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilderMain.java @@ -20,8 +20,8 @@ package org.apache.curator.framework.api; public interface ReconfigBuilderMain extends - Joinable, - Leaveable, - Addable + Joinable, + Leaveable, + Membersable { } http://git-wip-us.apache.org/repos/asf/curator/blob/d42ef172/curator-framework/src/main/java/org/apache/curator/framework/api/StatConfigureEnsembleable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/StatConfigureEnsembleable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/StatConfigureEnsembleable.java new file mode 100644 index 0000000..8b61ab9 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/StatConfigureEnsembleable.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.api; + +public interface StatConfigureEnsembleable extends + Statable, + ConfigureEnsembleable +{ +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/curator/blob/d42ef172/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java index 832272b..e786883 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java @@ -31,18 +31,14 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Executor; -public class ReconfigBuilderImpl implements - ReconfigBuilder, - ReconfigBuilderMain, - ConfigureEnsembleable, - BackgroundOperation,Statable>,Ensembleable +public class ReconfigBuilderImpl implements ReconfigBuilder, BackgroundOperation { private final CuratorFrameworkImpl client; private Backgrounding backgrounding = new Backgrounding(); private Stat responseStat; private long fromConfig = -1; - private List adding; + private List newMembers; private List joining; private List leaving; @@ -51,8 +47,7 @@ public class ReconfigBuilderImpl implements this.client = client; } - @Override - public byte[] forEnsemble() throws Exception + private byte[] forEnsemble() throws Exception { if ( backgrounding.inBackground() ) { @@ -66,152 +61,6 @@ public class ReconfigBuilderImpl implements } @Override - public Ensembleable storingStatIn(Stat stat) - { - responseStat = stat; - return this; - } - - @Override - public Ensembleable fromConfig(long config) throws Exception - { - fromConfig = config; - return this; - } - - @Override - public JoinLeaveStatConfigEnsembleable adding(String... server) - { - return adding((server != null) ? Arrays.asList(server) : null); - } - - @Override - public JoinLeaveStatConfigEnsembleable adding(List servers) - { - this.adding = (servers != null) ? ImmutableList.copyOf(servers) : ImmutableList.of(); - - return new JoinLeaveStatConfigEnsembleable() - { - @Override - public byte[] forEnsemble() throws Exception - { - return ReconfigBuilderImpl.this.forEnsemble(); - } - - @Override - public ConfigureEnsembleable storingStatIn(Stat stat) - { - return new ConfigureEnsembleable() - { - @Override - public Ensembleable fromConfig(long config) throws Exception - { - return ReconfigBuilderImpl.this.fromConfig(config); - } - - @Override - public byte[] forEnsemble() throws Exception - { - return ReconfigBuilderImpl.this.forEnsemble(); - } - }; - } - - @Override - public Ensembleable fromConfig(long config) throws Exception - { - return ReconfigBuilderImpl.this.fromConfig(config); - } - - @Override - public LeaveStatConfigEnsembleable joining(String... server) - { - return joining((server != null) ? Arrays.asList(server) : null); - } - - @Override - public LeaveStatConfigEnsembleable joining(List servers) - { - return new LeaveStatConfigEnsembleable() - { - @Override - public byte[] forEnsemble() throws Exception - { - return ReconfigBuilderImpl.this.forEnsemble(); - } - - @Override - public ConfigureEnsembleable storingStatIn(Stat stat) - { - return new InternalConfigureEnsembleable(); - } - - @Override - public Ensembleable fromConfig(long config) throws Exception - { - return ReconfigBuilderImpl.this.fromConfig(config); - } - - @Override - public Statable leaving(List servers) - { - return ReconfigBuilderImpl.this.leaving(servers); - } - - @Override - public Statable leaving(String... server) - { - return ReconfigBuilderImpl.this.leaving(server); - } - }; - } - - @Override - public JoinStatConfigEnsembleable leaving(String... server) - { - return leaving((server != null) ? Arrays.asList(server) : null); - } - - @Override - public JoinStatConfigEnsembleable leaving(List servers) - { - return new JoinStatConfigEnsembleable() - { - @Override - public byte[] forEnsemble() throws Exception - { - return ReconfigBuilderImpl.this.forEnsemble(); - } - - @Override - public ConfigureEnsembleable storingStatIn(Stat stat) - { - return new InternalConfigureEnsembleable(); - } - - @Override - public Ensembleable fromConfig(long config) throws Exception - { - return ReconfigBuilderImpl.this.fromConfig(config); - } - - @Override - public ConfigureEnsembleable joining(List servers) - { - return ReconfigBuilderImpl.this.joining(servers); - } - - @Override - public ConfigureEnsembleable joining(String... server) - { - return ReconfigBuilderImpl.this.joining(server); - } - }; - } - }; - } - - @Override public ReconfigBuilderMain inBackground() { backgrounding = new Backgrounding(true); @@ -254,19 +103,25 @@ public class ReconfigBuilderImpl implements } @Override - public LeaveAddStatConfigEnsembleable joining(String... server) + public StatConfigureEnsembleable withNewMembers(String... server) { - return joining((server != null) ? Arrays.asList(server) : null); + return withNewMembers((server != null) ? Arrays.asList(server) : null); } @Override - public LeaveAddStatConfigEnsembleable joining(List servers) + public StatConfigureEnsembleable withNewMembers(List servers) { - joining = (servers != null) ? ImmutableList.copyOf(servers) : ImmutableList.of(); - - return new LeaveAddStatConfigEnsembleable() + newMembers = (servers != null) ? ImmutableList.copyOf(servers) : ImmutableList.of(); + return new StatConfigureEnsembleable() { @Override + public Ensembleable fromConfig(long config) throws Exception + { + fromConfig = config; + return this; + } + + @Override public byte[] forEnsemble() throws Exception { return ReconfigBuilderImpl.this.forEnsemble(); @@ -275,115 +130,71 @@ public class ReconfigBuilderImpl implements @Override public ConfigureEnsembleable storingStatIn(Stat stat) { - return new InternalConfigureEnsembleable(); + responseStat = stat; + return this; } + }; + } + + @Override + public LeaveStatConfigEnsembleable joining(String... server) + { + return joining((server != null) ? Arrays.asList(server) : null); + } + + @Override + public LeaveStatConfigEnsembleable joining(List servers) + { + joining = (servers != null) ? ImmutableList.copyOf(servers) : ImmutableList.of(); + return new LeaveStatConfigEnsembleable() + { @Override - public Ensembleable fromConfig(long config) throws Exception + public byte[] forEnsemble() throws Exception { - return ReconfigBuilderImpl.this.fromConfig(config); + return ReconfigBuilderImpl.this.forEnsemble(); } @Override - public LeaveStatConfigEnsembleable adding(String... server) + public ConfigureEnsembleable storingStatIn(Stat stat) { - return adding((server != null) ? Arrays.asList(server) : null); + responseStat = stat; + return this; } @Override - public LeaveStatConfigEnsembleable adding(List servers) + public Ensembleable fromConfig(long config) throws Exception { - return new LeaveStatConfigEnsembleable() - { - @Override - public byte[] forEnsemble() throws Exception - { - return ReconfigBuilderImpl.this.forEnsemble(); - } - - @Override - public ConfigureEnsembleable storingStatIn(Stat stat) - { - return new InternalConfigureEnsembleable(); - } - - @Override - public Ensembleable fromConfig(long config) throws Exception - { - return ReconfigBuilderImpl.this.fromConfig(config); - } - - @Override - public Statable leaving(List servers) - { - return ReconfigBuilderImpl.this.leaving(servers); - } - - @Override - public Statable leaving(String... server) - { - return ReconfigBuilderImpl.this.leaving(server); - } - }; + fromConfig = config; + return this; } @Override - public AddStatConfigEnsembleable leaving(String... server) + public JoinStatConfigEnsembleable leaving(String... server) { - return leaving((server != null) ? Arrays.asList(server) : null); + return ReconfigBuilderImpl.this.leaving(server); } @Override - public AddStatConfigEnsembleable leaving(List servers) + public JoinStatConfigEnsembleable leaving(List servers) { - return new AddStatConfigEnsembleable() - { - @Override - public byte[] forEnsemble() throws Exception - { - return ReconfigBuilderImpl.this.forEnsemble(); - } - - @Override - public ConfigureEnsembleable storingStatIn(Stat stat) - { - return new InternalConfigureEnsembleable(); - } - - @Override - public ConfigureEnsembleable fromConfig(long config) throws Exception - { - return new InternalConfigureEnsembleable(); - } - - @Override - public Statable adding(List servers) - { - return ReconfigBuilderImpl.this.adding(servers); - } - - @Override - public Statable adding(String... server) - { - return ReconfigBuilderImpl.this.adding(server); - } - }; + return ReconfigBuilderImpl.this.leaving(servers); } }; } @Override - public JoinAddStatConfigEnsembleable leaving(String... server) + public JoinStatConfigEnsembleable leaving(String... server) { return leaving((server != null) ? Arrays.asList(server) : null); } @Override - public JoinAddStatConfigEnsembleable leaving(List servers) + public JoinStatConfigEnsembleable leaving(List servers) { leaving = (servers != null) ? ImmutableList.copyOf(servers) : ImmutableList.of(); - return new JoinAddStatConfigEnsembleable() + return new JoinStatConfigEnsembleable() { @Override public byte[] forEnsemble() throws Exception @@ -394,81 +205,27 @@ public class ReconfigBuilderImpl implements @Override public ConfigureEnsembleable storingStatIn(Stat stat) { - return new InternalConfigureEnsembleable(); + responseStat = stat; + return this; } @Override public Ensembleable fromConfig(long config) throws Exception { - return ReconfigBuilderImpl.this.fromConfig(config); + fromConfig = config; + return this; } @Override - public JoinStatConfigurable adding(String... server) - { - return adding((server != null) ? Arrays.asList(server) : null); - } - - @Override - public JoinStatConfigurable adding(List servers) - { - return new JoinStatConfigurable() - { - @Override - public ConfigureEnsembleable joining(List servers) - { - return ReconfigBuilderImpl.this.joining(servers); - } - - @Override - public ConfigureEnsembleable joining(String... server) - { - return ReconfigBuilderImpl.this.joining(server); - } - }; - } - - @Override - public AddStatConfigEnsembleable joining(String... server) + public LeaveStatConfigEnsembleable joining(String... server) { return joining((server != null) ? Arrays.asList(server) : null); } @Override - public AddStatConfigEnsembleable joining(List servers) + public LeaveStatConfigEnsembleable joining(List servers) { - return new AddStatConfigEnsembleable() - { - @Override - public byte[] forEnsemble() throws Exception - { - return ReconfigBuilderImpl.this.forEnsemble(); - } - - @Override - public ConfigureEnsembleable storingStatIn(Stat stat) - { - return new InternalConfigureEnsembleable(); - } - - @Override - public Ensembleable fromConfig(long config) throws Exception - { - return ReconfigBuilderImpl.this.fromConfig(config); - } - - @Override - public Statable adding(List servers) - { - return ReconfigBuilderImpl.this.adding(servers); - } - - @Override - public Statable adding(String... server) - { - return ReconfigBuilderImpl.this.adding(server); - } - }; + return ReconfigBuilderImpl.this.joining(servers); } }; } @@ -491,7 +248,7 @@ public class ReconfigBuilderImpl implements client.processBackgroundOperation(data, event); } }; - client.getZooKeeper().reconfig(joining, leaving, adding, fromConfig, callback, backgrounding.getContext()); + client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, callback, backgrounding.getContext()); } private byte[] ensembleInForeground() throws Exception @@ -505,26 +262,11 @@ public class ReconfigBuilderImpl implements @Override public byte[] call() throws Exception { - return client.getZooKeeper().reconfig(joining, leaving, adding, fromConfig, responseStat); + return client.getZooKeeper().reconfig(joining, leaving, newMembers, fromConfig, responseStat); } } ); trace.commit(); return responseData; } - - private class InternalConfigureEnsembleable implements ConfigureEnsembleable - { - @Override - public Ensembleable fromConfig(long config) throws Exception - { - return ReconfigBuilderImpl.this.fromConfig(config); - } - - @Override - public byte[] forEnsemble() throws Exception - { - return ReconfigBuilderImpl.this.forEnsemble(); - } - } } http://git-wip-us.apache.org/repos/asf/curator/blob/d42ef172/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java index d4c89be..99e5a2e 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java @@ -19,12 +19,17 @@ package org.apache.curator.framework.imps; +import com.google.common.collect.Lists; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.BaseClassForTests; import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingCluster; +import org.apache.curator.test.Timing; import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -37,7 +42,12 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.io.ByteArrayInputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; import java.util.Properties; +import java.util.concurrent.CountDownLatch; public class TestReconfiguration extends BaseClassForTests { @@ -84,75 +94,53 @@ public class TestReconfiguration extends BaseClassForTests // --------- - client.reconfig().adding().forEnsemble(); client.reconfig().leaving().forEnsemble(); client.reconfig().joining().forEnsemble(); - client.reconfig().adding().leaving().forEnsemble(); - client.reconfig().adding().joining().forEnsemble(); client.reconfig().leaving().joining().forEnsemble(); + client.reconfig().joining().leaving().forEnsemble(); + client.reconfig().withNewMembers().forEnsemble(); - client.reconfig().adding().fromConfig(0).forEnsemble(); client.reconfig().leaving().fromConfig(0).forEnsemble(); client.reconfig().joining().fromConfig(0).forEnsemble(); - client.reconfig().adding().leaving().fromConfig(0).forEnsemble(); - client.reconfig().adding().joining().fromConfig(0).forEnsemble(); client.reconfig().leaving().joining().fromConfig(0).forEnsemble(); + client.reconfig().joining().leaving().fromConfig(0).forEnsemble(); + client.reconfig().withNewMembers().fromConfig(0).forEnsemble(); - client.reconfig().adding().fromConfig(0).forEnsemble(); - client.reconfig().leaving().fromConfig(0).forEnsemble(); - client.reconfig().joining().fromConfig(0).forEnsemble(); - client.reconfig().adding().leaving().fromConfig(0).forEnsemble(); - client.reconfig().adding().joining().fromConfig(0).forEnsemble(); - client.reconfig().leaving().joining().fromConfig(0).forEnsemble(); - - client.reconfig().adding().storingStatIn(stat).forEnsemble(); client.reconfig().leaving().storingStatIn(stat).forEnsemble(); client.reconfig().joining().storingStatIn(stat).forEnsemble(); - client.reconfig().adding().leaving().storingStatIn(stat).forEnsemble(); - client.reconfig().adding().joining().storingStatIn(stat).forEnsemble(); client.reconfig().leaving().joining().storingStatIn(stat).forEnsemble(); + client.reconfig().joining().leaving().storingStatIn(stat).forEnsemble(); + client.reconfig().withNewMembers().storingStatIn(stat).forEnsemble(); - client.reconfig().adding().storingStatIn(stat).fromConfig(0).forEnsemble(); client.reconfig().leaving().storingStatIn(stat).fromConfig(0).forEnsemble(); client.reconfig().joining().storingStatIn(stat).fromConfig(0).forEnsemble(); - client.reconfig().adding().leaving().storingStatIn(stat).fromConfig(0).forEnsemble(); - client.reconfig().adding().joining().storingStatIn(stat).fromConfig(0).forEnsemble(); client.reconfig().leaving().joining().storingStatIn(stat).fromConfig(0).forEnsemble(); + client.reconfig().joining().leaving().storingStatIn(stat).fromConfig(0).forEnsemble(); + client.reconfig().withNewMembers().storingStatIn(stat).forEnsemble(); - client.reconfig().inBackground().adding().forEnsemble(); client.reconfig().inBackground().leaving().forEnsemble(); client.reconfig().inBackground().joining().forEnsemble(); - client.reconfig().inBackground().adding().leaving().forEnsemble(); - client.reconfig().inBackground().adding().joining().forEnsemble(); client.reconfig().inBackground().leaving().joining().forEnsemble(); + client.reconfig().inBackground().joining().leaving().forEnsemble(); + client.reconfig().inBackground().withNewMembers().forEnsemble(); - client.reconfig().inBackground().adding().fromConfig(0).forEnsemble(); - client.reconfig().inBackground().leaving().fromConfig(0).forEnsemble(); - client.reconfig().inBackground().joining().fromConfig(0).forEnsemble(); - client.reconfig().inBackground().adding().leaving().fromConfig(0).forEnsemble(); - client.reconfig().inBackground().adding().joining().fromConfig(0).forEnsemble(); - client.reconfig().inBackground().leaving().joining().fromConfig(0).forEnsemble(); - - client.reconfig().inBackground().adding().fromConfig(0).forEnsemble(); client.reconfig().inBackground().leaving().fromConfig(0).forEnsemble(); client.reconfig().inBackground().joining().fromConfig(0).forEnsemble(); - client.reconfig().inBackground().adding().leaving().fromConfig(0).forEnsemble(); - client.reconfig().inBackground().adding().joining().fromConfig(0).forEnsemble(); client.reconfig().inBackground().leaving().joining().fromConfig(0).forEnsemble(); + client.reconfig().inBackground().joining().leaving().fromConfig(0).forEnsemble(); + client.reconfig().inBackground().withNewMembers().fromConfig(0).forEnsemble(); - client.reconfig().inBackground().adding().storingStatIn(stat).forEnsemble(); client.reconfig().inBackground().leaving().storingStatIn(stat).forEnsemble(); client.reconfig().inBackground().joining().storingStatIn(stat).forEnsemble(); - client.reconfig().inBackground().adding().leaving().storingStatIn(stat).forEnsemble(); - client.reconfig().inBackground().adding().joining().storingStatIn(stat).forEnsemble(); client.reconfig().inBackground().leaving().joining().storingStatIn(stat).forEnsemble(); + client.reconfig().inBackground().joining().leaving().storingStatIn(stat).forEnsemble(); + client.reconfig().inBackground().withNewMembers().storingStatIn(stat).forEnsemble(); - client.reconfig().inBackground().adding().storingStatIn(stat).fromConfig(0).forEnsemble(); client.reconfig().inBackground().leaving().storingStatIn(stat).fromConfig(0).forEnsemble(); client.reconfig().inBackground().joining().storingStatIn(stat).fromConfig(0).forEnsemble(); - client.reconfig().inBackground().adding().leaving().storingStatIn(stat).fromConfig(0).forEnsemble(); - client.reconfig().inBackground().adding().joining().storingStatIn(stat).fromConfig(0).forEnsemble(); client.reconfig().inBackground().leaving().joining().storingStatIn(stat).fromConfig(0).forEnsemble(); + client.reconfig().inBackground().joining().leaving().storingStatIn(stat).fromConfig(0).forEnsemble(); + client.reconfig().inBackground().withNewMembers().storingStatIn(stat).forEnsemble(); } @Test @@ -163,33 +151,154 @@ public class TestReconfiguration extends BaseClassForTests client.start(); QuorumVerifier quorumVerifier = toQuorumVerifier(client.getConfig().forEnsemble()); System.out.println(quorumVerifier); + assertConfig(quorumVerifier, cluster.getInstances()); + } + } + + @Test + public void testAdd() throws Exception + { + Timing timing = new Timing(); + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1)) ) + { + client.start(); + + QuorumVerifier oldConfig = toQuorumVerifier(client.getConfig().forEnsemble()); + assertConfig(oldConfig, cluster.getInstances()); - for ( InstanceSpec instance : cluster.getInstances() ) + CountDownLatch latch = setChangeWaiter(client); + try ( TestingCluster newCluster = new TestingCluster(1, false) ) { - QuorumPeer.QuorumServer quorumServer = quorumVerifier.getAllMembers().get((long)instance.getServerId()); - Assert.assertNotNull(quorumServer); - Assert.assertEquals(quorumServer.clientAddr.getPort(), instance.getPort()); + newCluster.start(); + + client.reconfig().joining(toReconfigSpec(newCluster.getInstances())).fromConfig(oldConfig.getVersion()).forEnsemble(); + + Assert.assertTrue(timing.awaitLatch(latch)); + + QuorumVerifier newConfig = toQuorumVerifier(client.getConfig().forEnsemble()); + List newInstances = Lists.newArrayList(cluster.getInstances()); + newInstances.addAll(newCluster.getInstances()); + assertConfig(newConfig, newInstances); } } } @Test - public void testAdd1Sync() throws Exception + public void testAddAsync() throws Exception { + Timing timing = new Timing(); try ( CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1)) ) { client.start(); - Watcher watcher = new Watcher() + QuorumVerifier oldConfig = toQuorumVerifier(client.getConfig().forEnsemble()); + assertConfig(oldConfig, cluster.getInstances()); + + CountDownLatch latch = setChangeWaiter(client); + try ( TestingCluster newCluster = new TestingCluster(1, false) ) { - @Override - public void process(WatchedEvent event) + newCluster.start(); + + final CountDownLatch callbackLatch = new CountDownLatch(1); + BackgroundCallback callback = new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + if ( event.getType() == CuratorEventType.RECONFIG ) + { + callbackLatch.countDown(); + } + } + }; + client.reconfig().inBackground(callback).joining(toReconfigSpec(newCluster.getInstances())).fromConfig(oldConfig.getVersion()).forEnsemble(); + + Assert.assertTrue(timing.awaitLatch(callbackLatch)); + Assert.assertTrue(timing.awaitLatch(latch)); + QuorumVerifier newConfig = toQuorumVerifier(client.getConfig().forEnsemble()); + List newInstances = Lists.newArrayList(cluster.getInstances()); + newInstances.addAll(newCluster.getInstances()); + assertConfig(newConfig, newInstances); + } + } + } + + @Test + public void testAddAndRemove() throws Exception + { + Timing timing = new Timing(); + try ( CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1)) ) + { + client.start(); + + QuorumVerifier oldConfig = toQuorumVerifier(client.getConfig().forEnsemble()); + assertConfig(oldConfig, cluster.getInstances()); + + CountDownLatch latch = setChangeWaiter(client); + + try ( TestingCluster newCluster = new TestingCluster(1, false) ) + { + newCluster.start(); + + Collection oldInstances = cluster.getInstances(); + InstanceSpec us = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper()); + InstanceSpec removeSpec = oldInstances.iterator().next(); + if ( us.equals(removeSpec) ) { + Iterator iterator = oldInstances.iterator(); + iterator.next(); + removeSpec = iterator.next(); } - }; - client.getConfig().usingWatcher(watcher).forEnsemble(); + + Collection instances = newCluster.getInstances(); + client.reconfig().leaving(Integer.toString(removeSpec.getServerId())).joining(toReconfigSpec(instances)).fromConfig(oldConfig.getVersion()).forEnsemble(); + + Assert.assertTrue(timing.awaitLatch(latch)); + + QuorumVerifier newConfig = toQuorumVerifier(client.getConfig().forEnsemble()); + ArrayList newInstances = Lists.newArrayList(oldInstances); + newInstances.addAll(instances); + newInstances.remove(removeSpec); + assertConfig(newConfig, newInstances); + } + } + } + + private CountDownLatch setChangeWaiter(CuratorFramework client) throws Exception + { + final CountDownLatch latch = new CountDownLatch(1); + Watcher watcher = new Watcher() + { + @Override + public void process(WatchedEvent event) + { + if ( event.getType() == Event.EventType.NodeDataChanged ) + { + latch.countDown(); + } + } + }; + client.getConfig().usingWatcher(watcher).forEnsemble(); + return latch; + } + + private void assertConfig(QuorumVerifier config, Collection instances) + { + for ( InstanceSpec instance : instances ) + { + QuorumPeer.QuorumServer quorumServer = config.getAllMembers().get((long)instance.getServerId()); + Assert.assertNotNull(quorumServer, String.format("Looking for %s - found %s", instance.getServerId(), config.getAllMembers())); + Assert.assertEquals(quorumServer.clientAddr.getPort(), instance.getPort()); + } + } + + private List toReconfigSpec(Collection instances) + { + List specs = Lists.newArrayList(); + for ( InstanceSpec instance : instances ) { + specs.add("server." + instance.getServerId() + "=localhost:" + instance.getElectionPort() + ":" + instance.getQuorumPort() + ";" + instance.getPort()); } + return specs; } private static QuorumVerifier toQuorumVerifier(byte[] bytes) throws Exception http://git-wip-us.apache.org/repos/asf/curator/blob/d42ef172/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfigurationX.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfigurationX.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfigurationX.java index 2268055..7554ddd 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfigurationX.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfigurationX.java @@ -19,6 +19,7 @@ package org.apache.curator.framework.imps; +import com.google.common.collect.ImmutableList; import org.apache.curator.ensemble.EnsembleListener; import org.apache.curator.ensemble.dynamic.DynamicEnsembleProvider; import org.apache.curator.framework.CuratorFramework; @@ -63,7 +64,13 @@ public class TestReconfigurationX @BeforeMethod public void setup() throws Exception { - cluster = new TestingCluster(5); + ImmutableList.Builder builder = ImmutableList.builder(); + for ( int i = 1; i <= 5; ++i ) + { + builder.add(new InstanceSpec(null, -1, -1, -1, true, i, -1, -1)); + } + + cluster = new TestingCluster(builder.build()); cluster.start(); connectionString1to5 = cluster.getConnectString(); @@ -208,7 +215,7 @@ public class TestReconfigurationX //Remove Servers bytes = client.reconfig() - .adding("server.2=" + server2, + .withNewMembers("server.2=" + server2, "server.3=" + server3, "server.4=" + server4, "server.5=" + server5) @@ -220,7 +227,7 @@ public class TestReconfigurationX Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); bytes = client.reconfig() - .adding("server.3=" + server3, + .withNewMembers("server.3=" + server3, "server.4=" + server4, "server.5=" + server5) .storingStatIn(stat).fromConfig(qv.getVersion()).forEnsemble(); @@ -233,7 +240,7 @@ public class TestReconfigurationX //Add Servers bytes = client.reconfig() - .adding("server.2=" + server2, + .withNewMembers("server.2=" + server2, "server.3=" + server3, "server.4=" + server4, "server.5=" + server5) @@ -245,7 +252,7 @@ public class TestReconfigurationX Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5); bytes = client.reconfig() - .adding("server.1=" + server1, + .withNewMembers("server.1=" + server1, "server.2=" + server2, "server.3=" + server3, "server.4=" + server4, @@ -287,7 +294,7 @@ public class TestReconfigurationX //Remove Servers client.reconfig().inBackground(callback, latch) - .adding("server.2=" + server2, + .withNewMembers("server.2=" + server2, "server.3=" + server3, "server.4=" + server4, "server.5=" + server5) @@ -298,7 +305,7 @@ public class TestReconfigurationX Assert.assertEquals(qv.getAllMembers().size(), 4); client.reconfig().inBackground(callback, latch) - .adding("server.3=" + server3, + .withNewMembers("server.3=" + server3, "server.4=" + server4, "server.5=" + server5) .fromConfig(qv.getVersion()).forEnsemble(); @@ -309,7 +316,7 @@ public class TestReconfigurationX //Add Servers client.reconfig().inBackground(callback, latch) - .adding("server.2=" + server2, + .withNewMembers("server.2=" + server2, "server.3=" + server3, "server.4=" + server4, "server.5=" + server5) @@ -320,7 +327,7 @@ public class TestReconfigurationX Assert.assertEquals(qv.getAllMembers().size(), 4); client.reconfig().inBackground(callback, latch) - .adding("server.1=" + server1, + .withNewMembers("server.1=" + server1, "server.2=" + server2, "server.3=" + server3, "server.4=" + server4, @@ -332,14 +339,14 @@ public class TestReconfigurationX Assert.assertEquals(qv.getAllMembers().size(), 5); } - static QuorumVerifier getQuorumVerifier(byte[] bytes) throws Exception + private static QuorumVerifier getQuorumVerifier(byte[] bytes) throws Exception { Properties properties = new Properties(); properties.load(new StringReader(new String(bytes))); return new QuorumMaj(properties); } - static InstanceSpec getInstance(TestingCluster cluster, int id) + private static InstanceSpec getInstance(TestingCluster cluster, int id) { for ( InstanceSpec spec : cluster.getInstances() ) { @@ -351,7 +358,7 @@ public class TestReconfigurationX throw new IllegalStateException("InstanceSpec with id:" + id + " not found"); } - static String getServerString(QuorumVerifier qv, TestingCluster cluster, long id) throws Exception + private static String getServerString(QuorumVerifier qv, TestingCluster cluster, long id) throws Exception { String str = qv.getAllMembers().get(id).toString(); //check if connection string is already there. @@ -365,7 +372,7 @@ public class TestReconfigurationX } } - static String getConnectionString(TestingCluster cluster, long... ids) throws Exception + private static String getConnectionString(TestingCluster cluster, long... ids) throws Exception { StringBuilder sb = new StringBuilder(); Map specs = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/curator/blob/d42ef172/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java ---------------------------------------------------------------------- diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java index e2a1ae8..b8dada8 100644 --- a/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java +++ b/curator-test/src/main/java/org/apache/curator/test/TestingCluster.java @@ -52,6 +52,18 @@ public class TestingCluster implements Closeable } /** + * Creates an ensemble comprised of n servers. Each server will use + * a temp directory and random ports + * + * @param instanceQty number of servers to create in the ensemble + * @param resetServerIds if true, server Ids are reset first + */ + public TestingCluster(int instanceQty, boolean resetServerIds) + { + this(makeSpecs(instanceQty, resetServerIds)); + } + + /** * Creates an ensemble using the given server specs * * @param specs the server specs @@ -99,17 +111,17 @@ public class TestingCluster implements Closeable public Collection getInstances() { Iterable transformed = Iterables.transform - ( - servers, - new Function() - { - @Override - public InstanceSpec apply(TestingZooKeeperServer server) + ( + servers, + new Function() { - return server.getInstanceSpec(); + @Override + public InstanceSpec apply(TestingZooKeeperServer server) + { + return server.getInstanceSpec(); + } } - } - ); + ); return Lists.newArrayList(transformed); } @@ -244,7 +256,15 @@ public class TestingCluster implements Closeable private static Map> makeSpecs(int instanceQty) { - InstanceSpec.reset(); + return makeSpecs(instanceQty, true); + } + + private static Map> makeSpecs(int instanceQty, boolean resetServerIds) + { + if ( resetServerIds ) + { + InstanceSpec.reset(); + } ImmutableList.Builder builder = ImmutableList.builder(); for ( int i = 0; i < instanceQty; ++i ) {