Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8DDC6177D3 for ; Tue, 12 May 2015 13:54:02 +0000 (UTC) Received: (qmail 56712 invoked by uid 500); 12 May 2015 13:54:02 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 56674 invoked by uid 500); 12 May 2015 13:54:02 -0000 Mailing-List: contact commits-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.apache.org Delivered-To: mailing list commits@curator.apache.org Received: (qmail 56663 invoked by uid 99); 12 May 2015 13:54:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 May 2015 13:54:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5B001DFF1B; Tue, 12 May 2015 13:54:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: randgalt@apache.org To: commits@curator.apache.org Date: Tue, 12 May 2015 13:54:02 -0000 Message-Id: <4f78bbf8221e442385ad54149899906d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/25] curator git commit: [CURATOR-160] Add builders and dsl for ZooKeeper's config and reconfig methods. Repository: curator Updated Branches: refs/heads/CURATOR-3.0 3016ce2f4 -> fc45a4ec8 [CURATOR-160] Add builders and dsl for ZooKeeper's config and reconfig methods. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2c576dc3 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2c576dc3 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2c576dc3 Branch: refs/heads/CURATOR-3.0 Commit: 2c576dc344a167ad4a72d71412c98d76ff4e2d3d Parents: d4883a8 Author: Ioannis Canellos Authored: Thu Nov 6 17:34:47 2014 +0200 Committer: Ioannis Canellos Committed: Thu Nov 6 17:34:47 2014 +0200 ---------------------------------------------------------------------- .../curator/framework/CuratorFramework.java | 12 + .../framework/api/AsyncReconfigurable.java | 29 ++ .../curator/framework/api/DataCallbackable.java | 32 ++ .../curator/framework/api/GetConfigBuilder.java | 27 ++ .../api/IncrementalReconfigBuilder.java | 33 ++ .../apache/curator/framework/api/Joinable.java | 40 +++ .../apache/curator/framework/api/Leaveable.java | 38 +++ .../curator/framework/api/Memberable.java | 40 +++ .../api/NonIncrementalReconfigBuilder.java | 32 ++ .../curator/framework/api/ReconfigBuilder.java | 26 ++ .../framework/api/SyncReconfigurable.java | 30 ++ .../framework/imps/CuratorFrameworkImpl.java | 10 + .../framework/imps/GetConfigBuilderImpl.java | 80 +++++ .../framework/imps/ReconfigBuilderImpl.java | 182 +++++++++++ .../framework/imps/TestReconfiguration.java | 303 +++++++++++++++++++ 15 files changed, 914 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/2c576dc3/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java index 9c23ddb..181e4e8 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java @@ -122,6 +122,18 @@ public interface CuratorFramework extends Closeable public SetACLBuilder setACL(); /** + * Start a reconfig builder + * @return builder object + */ + public ReconfigBuilder reconfig(); + + /** + * Start a getConfig builder + * @return + */ + public GetConfigBuilder getConfig(); + + /** * Start a transaction builder * * @return builder object http://git-wip-us.apache.org/repos/asf/curator/blob/2c576dc3/curator-framework/src/main/java/org/apache/curator/framework/api/AsyncReconfigurable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/AsyncReconfigurable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/AsyncReconfigurable.java new file mode 100644 index 0000000..fc7fd57 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/AsyncReconfigurable.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +public interface AsyncReconfigurable { + + /** + * Sets the configuration version to use. + * @param config The version of the configuration. + * @throws Exception + */ + void fromConfig(long config) throws Exception; +} http://git-wip-us.apache.org/repos/asf/curator/blob/2c576dc3/curator-framework/src/main/java/org/apache/curator/framework/api/DataCallbackable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/DataCallbackable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/DataCallbackable.java new file mode 100644 index 0000000..75ded65 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/DataCallbackable.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +import org.apache.zookeeper.AsyncCallback.DataCallback; + +public interface DataCallbackable { + + /** + * Passes a callback and a context object to the config/reconfig command. + * @param callback The async callback to use. + * @param ctx An object that will be passed to the callback. + * @return this + */ + T usingDataCallback(DataCallback callback, Object ctx); +} http://git-wip-us.apache.org/repos/asf/curator/blob/2c576dc3/curator-framework/src/main/java/org/apache/curator/framework/api/GetConfigBuilder.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/GetConfigBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/GetConfigBuilder.java new file mode 100644 index 0000000..c7c013b --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/GetConfigBuilder.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +public interface GetConfigBuilder extends + Watchable, + DataCallbackable, + Statable { +} + + http://git-wip-us.apache.org/repos/asf/curator/blob/2c576dc3/curator-framework/src/main/java/org/apache/curator/framework/api/IncrementalReconfigBuilder.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/IncrementalReconfigBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/IncrementalReconfigBuilder.java new file mode 100644 index 0000000..0ad6426 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/IncrementalReconfigBuilder.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +/** + * An incremental reconfiguration builder. + * This builder has access only to the incremental reconfiguration methods join and leave, so that we prevent + * mixing concepts that can't be used together. + * @param + */ +public interface IncrementalReconfigBuilder extends + Joinable>, + Leaveable>, + DataCallbackable, + Statable { + +} http://git-wip-us.apache.org/repos/asf/curator/blob/2c576dc3/curator-framework/src/main/java/org/apache/curator/framework/api/Joinable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Joinable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Joinable.java new file mode 100644 index 0000000..ff1b3c5 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Joinable.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +import java.util.Collection; + +public interface Joinable { + + /** + * Adds a server to join the ensemble. + * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port] + * @param server The server to join. + * @return this. + */ + T join(String server); + + /** + * Adds a collection of servers to the ensemble. + * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port] + * @param servers The collection of servers to join + * @return this + */ + T join(Collection servers); +} http://git-wip-us.apache.org/repos/asf/curator/blob/2c576dc3/curator-framework/src/main/java/org/apache/curator/framework/api/Leaveable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Leaveable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Leaveable.java new file mode 100644 index 0000000..8560d65 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Leaveable.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +import java.util.Collection; + +public interface Leaveable { + + /** + * Sets a server to leave the ensemble. + * @param server The server id. + * @return this + */ + T leave(String server); + + /** + * Sets a collection of servers to leave the ensemble. + * @param servers The collection of server ids. + * @return this. + */ + T leave(Collection servers); +} http://git-wip-us.apache.org/repos/asf/curator/blob/2c576dc3/curator-framework/src/main/java/org/apache/curator/framework/api/Memberable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Memberable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Memberable.java new file mode 100644 index 0000000..5b62dba --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Memberable.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +import java.util.Collection; + +public interface Memberable { + + /** + * Sets a member that is meant to be part of the ensemble. + * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port] + * @param server The server to add as a member of the ensemble. + * @return this. + */ + T withMember(String server); + + /** + * Sets a collection of members member that is meant to be part of the ensemble. + * The expected format is server.[id]=[hostname]:[peer port]:[election port]:[type];[client port] + * @param servers The collection of server to set as a members of the ensemble. + * @return this. + */ + T withMembers(Collection servers); +} http://git-wip-us.apache.org/repos/asf/curator/blob/2c576dc3/curator-framework/src/main/java/org/apache/curator/framework/api/NonIncrementalReconfigBuilder.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/NonIncrementalReconfigBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/NonIncrementalReconfigBuilder.java new file mode 100644 index 0000000..2f6a9c6 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/NonIncrementalReconfigBuilder.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +/** + * An non-incremental reconfiguration builder. + * This builder has access only to the non-incremental reconfiguration methods withMembers, so that we prevent + * mixing concepts that can't be used together. + * @param + */ +public interface NonIncrementalReconfigBuilder extends + Memberable>, + DataCallbackable, + Statable { + +} http://git-wip-us.apache.org/repos/asf/curator/blob/2c576dc3/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilder.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilder.java b/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilder.java new file mode 100644 index 0000000..0e420a1 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/ReconfigBuilder.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +public interface ReconfigBuilder extends + Joinable>, + Leaveable>, + Memberable> { + +} http://git-wip-us.apache.org/repos/asf/curator/blob/2c576dc3/curator-framework/src/main/java/org/apache/curator/framework/api/SyncReconfigurable.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/SyncReconfigurable.java b/curator-framework/src/main/java/org/apache/curator/framework/api/SyncReconfigurable.java new file mode 100644 index 0000000..bd7b96b --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/SyncReconfigurable.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.api; + +public interface SyncReconfigurable { + + /** + * Sets the configuration version to use. + * @param config The version of the configuration. + * @return The configuration data. + * @throws Exception + */ + byte[] fromConfig(long config) throws Exception; +} http://git-wip-us.apache.org/repos/asf/curator/blob/2c576dc3/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index cf38e21..53f0a61 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -413,6 +413,16 @@ public class CuratorFrameworkImpl implements CuratorFramework } @Override + public ReconfigBuilder reconfig() { + return new ReconfigBuilderImpl(this); + } + + @Override + public GetConfigBuilder getConfig() { + return new GetConfigBuilderImpl(this); + } + + @Override public CuratorTransaction inTransaction() { Preconditions.checkState(getState() == CuratorFrameworkState.STARTED, "instance must be started before calling this method"); http://git-wip-us.apache.org/repos/asf/curator/blob/2c576dc3/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java new file mode 100644 index 0000000..a56894d --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetConfigBuilderImpl.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.imps; + +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.api.GetConfigBuilder; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; + +public class GetConfigBuilderImpl implements GetConfigBuilder { + + private final CuratorFrameworkImpl client; + private boolean watched; + private Watcher watcher; + + public GetConfigBuilderImpl(CuratorFrameworkImpl client) { + this.client = client; + } + + @Override + public Void usingDataCallback(AsyncCallback.DataCallback callback, Object ctx) { + try { + if (watcher != null) { + client.getZooKeeper().getConfig(watcher, callback, ctx); + } else { + client.getZooKeeper().getConfig(watched, callback, ctx); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return null; + } + + @Override + public byte[] storingStatIn(Stat stat) { + try { + if (watcher != null) { + return client.getZooKeeper().getConfig(watcher, stat); + } else { + return client.getZooKeeper().getConfig(watched, stat); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public GetConfigBuilder watched() { + this.watched = true; + return this; + } + + @Override + public GetConfigBuilder usingWatcher(Watcher watcher) { + this.watcher = watcher; + return null; + } + + @Override + public GetConfigBuilder usingWatcher(final CuratorWatcher watcher) { + throw new UnsupportedOperationException("GetConfigBuilder doesn't support CuratorWatcher, please use Watcher instead."); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/2c576dc3/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java new file mode 100644 index 0000000..7b39be6 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ReconfigBuilderImpl.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.imps; + +import org.apache.curator.framework.api.AsyncReconfigurable; +import org.apache.curator.framework.api.IncrementalReconfigBuilder; +import org.apache.curator.framework.api.NonIncrementalReconfigBuilder; +import org.apache.curator.framework.api.ReconfigBuilder; +import org.apache.curator.framework.api.SyncReconfigurable; +import org.apache.zookeeper.AsyncCallback.DataCallback; +import org.apache.zookeeper.data.Stat; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; + +public class ReconfigBuilderImpl implements ReconfigBuilder { + + private final CuratorFrameworkImpl client; + + private static class IncrementalReconfigBuilderImpl implements IncrementalReconfigBuilder { + + private final CuratorFrameworkImpl client; + private List joiningServers = new LinkedList(); + private List leavingServers = new LinkedList(); + + private IncrementalReconfigBuilderImpl(CuratorFrameworkImpl client) { + this.client = client; + } + + @Override + public IncrementalReconfigBuilderImpl join(String server) { + joiningServers.add(server); + return this; + } + + @Override + public IncrementalReconfigBuilder join(Collection servers) { + joiningServers.addAll(servers); + return this; + } + + @Override + public IncrementalReconfigBuilderImpl leave(String server) { + leavingServers.add(server); + return this; + } + + @Override + public IncrementalReconfigBuilder leave(Collection servers) { + leavingServers.addAll(servers); + return this; + } + + @Override + public SyncReconfigurable storingStatIn(final Stat stat) { + return new SyncReconfigurable() { + @Override + public byte[] fromConfig(long config) throws Exception { + return client + .getZooKeeper() + .reconfig(joiningServers.isEmpty() ? null : joiningServers, + leavingServers.isEmpty() ? null : leavingServers, + null, + config, stat); + } + }; + } + + @Override + public AsyncReconfigurable usingDataCallback(final DataCallback callback, final Object ctx) { + return new AsyncReconfigurable() { + @Override + public void fromConfig(long config) throws Exception { + client.getZooKeeper() + .reconfig(joiningServers.isEmpty() ? null : joiningServers, + leavingServers.isEmpty() ? null : leavingServers, + null, + config, callback, ctx); + } + }; + } + } + + private static class NonIncrementalReconfigBuilderImpl implements NonIncrementalReconfigBuilder { + + private final CuratorFrameworkImpl client; + private List newMembers = new LinkedList(); + + private NonIncrementalReconfigBuilderImpl(CuratorFrameworkImpl client) { + this.client = client; + } + + private NonIncrementalReconfigBuilderImpl(CuratorFrameworkImpl client, List newMembers) { + this.client = client; + this.newMembers = newMembers; + } + + @Override + public NonIncrementalReconfigBuilder withMember(String server) { + newMembers.add(server); + return this; + } + + @Override + public NonIncrementalReconfigBuilder withMembers(Collection servers) { + newMembers.addAll(servers); + return this; + } + + @Override + public SyncReconfigurable storingStatIn(final Stat stat) { + return new SyncReconfigurable() { + @Override + public byte[] fromConfig(long config) throws Exception { + return client.getZooKeeper().reconfig(null, null, newMembers, config, stat); + } + }; + } + + @Override + public AsyncReconfigurable usingDataCallback(final DataCallback callback, final Object ctx) { + return new AsyncReconfigurable() { + @Override + public void fromConfig(long config) throws Exception { + client.getZooKeeper().reconfig(null, null, newMembers, config, callback, ctx); + } + }; + } + } + + + public ReconfigBuilderImpl(CuratorFrameworkImpl client) { + this.client = client; + } + + @Override + public IncrementalReconfigBuilder join(String server) { + return new IncrementalReconfigBuilderImpl(client).join(server); + } + + @Override + public IncrementalReconfigBuilder join(Collection servers) { + return new IncrementalReconfigBuilderImpl(client).join(servers); + } + + @Override + public IncrementalReconfigBuilder leave(String server) { + return new IncrementalReconfigBuilderImpl(client).leave(server); + } + + @Override + public IncrementalReconfigBuilder leave(Collection servers) { + return new IncrementalReconfigBuilderImpl(client).leave(servers); + } + + @Override + public NonIncrementalReconfigBuilder withMember(String server) { + return new NonIncrementalReconfigBuilderImpl(client).withMember(server); + } + + @Override + public NonIncrementalReconfigBuilder withMembers(Collection servers) { + return new NonIncrementalReconfigBuilderImpl(client).withMembers(servers); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/2c576dc3/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java new file mode 100644 index 0000000..6918825 --- /dev/null +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.framework.imps; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.test.InstanceSpec; +import org.apache.curator.test.TestingCluster; +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.io.StringReader; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public class TestReconfiguration { + + static TestingCluster cluster; + + @BeforeClass + public void setup() throws Exception { + cluster = new TestingCluster(5); + cluster.start(); + } + + @AfterClass + public void tearDown() throws IOException { + cluster.close(); + } + + @Test + public void testSyncIncremental() throws Exception { + CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1)); + client.start(); + client.blockUntilConnected(); + try { + Stat stat = new Stat(); + byte[] bytes = client.getConfig().storingStatIn(stat); + Assert.assertNotNull(bytes); + QuorumVerifier qv = getQuorumVerifier(bytes); + Assert.assertEquals(5, qv.getAllMembers().size()); + String server1 = getServerString(qv, cluster, 1L); + String server2 = getServerString(qv, cluster, 2L); + + //Remove Servers + bytes = client.reconfig().leave("1").storingStatIn(stat).fromConfig(qv.getVersion()); + qv = getQuorumVerifier(bytes); + Assert.assertEquals(4, qv.getAllMembers().size()); + bytes = client.reconfig().leave("2").storingStatIn(stat).fromConfig(qv.getVersion()); + qv = getQuorumVerifier(bytes); + Assert.assertEquals(3, qv.getAllMembers().size()); + + //Add Servers + bytes = client.reconfig().join("server.1=" + server1).storingStatIn(stat).fromConfig(qv.getVersion()); + qv = getQuorumVerifier(bytes); + Assert.assertEquals(4, qv.getAllMembers().size()); + bytes = client.reconfig().join("server.2=" + server2).storingStatIn(stat).fromConfig(qv.getVersion()); + qv = getQuorumVerifier(bytes); + Assert.assertEquals(5, qv.getAllMembers().size()); + } finally { + client.close(); + } + } + + @Test + public void testAsyncIncremental() throws Exception { + CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1)); + client.start(); + client.blockUntilConnected(); + try { + final AtomicReference bytes = new AtomicReference(); + final AsyncCallback.DataCallback callback = new AsyncCallback.DataCallback() { + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + bytes.set(data); + ((CountDownLatch)ctx).countDown(); + } + }; + + CountDownLatch latch = new CountDownLatch(1); + client.getConfig().usingDataCallback(callback, latch); + latch.await(5, TimeUnit.SECONDS); + Assert.assertNotNull(bytes.get()); + QuorumVerifier qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(5, qv.getAllMembers().size()); + String server1 = getServerString(qv, cluster, 1L); + String server2 = getServerString(qv, cluster, 2L); + + + //Remove Servers + latch = new CountDownLatch(1); + client.reconfig().leave("1").usingDataCallback(callback, latch).fromConfig(qv.getVersion()); + latch.await(5, TimeUnit.SECONDS); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(4, qv.getAllMembers().size()); + latch = new CountDownLatch(1); + client.reconfig().leave("2").usingDataCallback(callback, latch).fromConfig(qv.getVersion()); + latch.await(5, TimeUnit.SECONDS); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(3, qv.getAllMembers().size()); + + //Add Servers + latch = new CountDownLatch(1); + client.reconfig().join("server.1=" + server1).usingDataCallback(callback, latch).fromConfig(qv.getVersion()); + latch.await(5, TimeUnit.SECONDS); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(4, qv.getAllMembers().size()); + latch = new CountDownLatch(1); + client.reconfig().join("server.2=" + server2).usingDataCallback(callback, latch).fromConfig(qv.getVersion()); + latch.await(5, TimeUnit.SECONDS); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(5, qv.getAllMembers().size()); + } finally { + client.close(); + } + } + + @Test + public void testSyncNonIncremental() throws Exception { + CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1)); + client.start(); + client.blockUntilConnected(); + try { + Stat stat = new Stat(); + byte[] bytes = client.getConfig().storingStatIn(stat); + Assert.assertNotNull(bytes); + QuorumVerifier qv = getQuorumVerifier(bytes); + Assert.assertEquals(5, qv.getAllMembers().size()); + String server1 = getServerString(qv, cluster, 1L); + String server2 = getServerString(qv, cluster, 2L); + String server3 = getServerString(qv, cluster, 3L); + String server4 = getServerString(qv, cluster, 4L); + String server5 = getServerString(qv, cluster, 5L); + + //Remove Servers + bytes = client.reconfig() + .withMember("server.2="+server2) + .withMember("server.3="+server3) + .withMember("server.4="+server4) + .withMember("server.5="+server5) + .storingStatIn(stat).fromConfig(qv.getVersion()); + qv = getQuorumVerifier(bytes); + Assert.assertEquals(4, qv.getAllMembers().size()); + bytes = client.reconfig() + .withMember("server.3=" + server3) + .withMember("server.4=" + server4) + .withMember("server.5=" + server5) + .storingStatIn(stat).fromConfig(qv.getVersion()); + + qv = getQuorumVerifier(bytes); + Assert.assertEquals(3, qv.getAllMembers().size()); + + //Add Servers + bytes = client.reconfig() + .withMember("server.1="+server1) + .withMember("server.3=" + server3) + .withMember("server.4="+server4) + .withMember("server.5="+server5) + .storingStatIn(stat).fromConfig(qv.getVersion()); + qv = getQuorumVerifier(bytes); + Assert.assertEquals(4, qv.getAllMembers().size()); + bytes = client.reconfig() + .withMember("server.1="+server1) + .withMember("server.2="+server2) + .withMember("server.3=" + server3) + .withMember("server.4="+server4) + .withMember("server.5="+server5) + .storingStatIn(stat).fromConfig(qv.getVersion()); + qv = getQuorumVerifier(bytes); + Assert.assertEquals(5, qv.getAllMembers().size()); + } finally { + client.close(); + } + } + + @Test + public void testAsyncNonIncremental() throws Exception { + CuratorFramework client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), new RetryOneTime(1)); + client.start(); + client.blockUntilConnected(); + try { + final AtomicReference bytes = new AtomicReference(); + final AsyncCallback.DataCallback callback = new AsyncCallback.DataCallback() { + @Override + public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { + bytes.set(data); + ((CountDownLatch)ctx).countDown(); + } + }; + + CountDownLatch latch = new CountDownLatch(1); + client.getConfig().usingDataCallback(callback, latch); + latch.await(5, TimeUnit.SECONDS); + Assert.assertNotNull(bytes.get()); + QuorumVerifier qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(5, qv.getAllMembers().size()); + String server1 = getServerString(qv, cluster, 1L); + String server2 = getServerString(qv, cluster, 2L); + String server3 = getServerString(qv, cluster, 3L); + String server4 = getServerString(qv, cluster, 4L); + String server5 = getServerString(qv, cluster, 5L); + + + //Remove Servers + latch = new CountDownLatch(1); + client.reconfig() + .withMember("server.2=" + server2) + .withMember("server.3="+server3) + .withMember("server.4="+server4) + .withMember("server.5="+server5) + .usingDataCallback(callback, latch).fromConfig(qv.getVersion()); + latch.await(5, TimeUnit.SECONDS); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(4, qv.getAllMembers().size()); + latch = new CountDownLatch(1); + client.reconfig() + .withMember("server.3="+server3) + .withMember("server.4=" + server4) + .withMember("server.5=" + server5) + .usingDataCallback(callback, latch).fromConfig(qv.getVersion()); + latch.await(5, TimeUnit.SECONDS); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(3, qv.getAllMembers().size()); + + //Add Servers + latch = new CountDownLatch(1); + client.reconfig() + .withMember("server.1="+server1) + .withMember("server.3=" + server3) + .withMember("server.4=" + server4) + .withMember("server.5=" + server5) + .usingDataCallback(callback, latch).fromConfig(qv.getVersion()); + latch.await(5, TimeUnit.SECONDS); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(4, qv.getAllMembers().size()); + latch = new CountDownLatch(1); + client.reconfig() + .withMember("server.1="+server1) + .withMember("server.2="+server2) + .withMember("server.3="+server3) + .withMember("server.4=" + server4) + .withMember("server.5=" + server5) + .usingDataCallback(callback, latch).fromConfig(qv.getVersion()); + latch.await(5, TimeUnit.SECONDS); + qv = getQuorumVerifier(bytes.get()); + Assert.assertEquals(5, qv.getAllMembers().size()); + } finally { + client.close(); + } + } + + + static QuorumVerifier getQuorumVerifier(byte[] bytes) throws Exception { + Properties properties = new Properties(); + properties.load(new StringReader(new String(bytes))); + return new QuorumMaj(properties); + } + + static InstanceSpec getInstance(TestingCluster cluster, int id) { + for (InstanceSpec spec : cluster.getInstances()) { + if (spec.getServerId() == id) { + return spec; + } + } + throw new IllegalStateException("InstanceSpec with id:" + id + " not found"); + } + + static String getServerString(QuorumVerifier qv, TestingCluster cluster, long id) throws Exception { + String str = qv.getAllMembers().get(id).toString(); + //check if connection string is already there. + if (str.contains(";")) { + return str; + } else { + return str + ";" + getInstance(cluster, (int) id).getConnectString(); + } + } +} \ No newline at end of file