Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8B58A17B0C for ; Sat, 9 May 2015 14:47:09 +0000 (UTC) Received: (qmail 5920 invoked by uid 500); 9 May 2015 14:47:09 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 5891 invoked by uid 500); 9 May 2015 14:47:09 -0000 Mailing-List: contact commits-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.apache.org Delivered-To: mailing list commits@curator.apache.org Received: (qmail 5882 invoked by uid 99); 9 May 2015 14:47:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 09 May 2015 14:47:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 51118E0006; Sat, 9 May 2015 14:47:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: randgalt@apache.org To: commits@curator.apache.org Message-Id: <59e3ecf7152542f0a8950f737fdfd976@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: curator git commit: Moved EnsembleTracker and did some refactoring Date: Sat, 9 May 2015 14:47:09 +0000 (UTC) Repository: curator Updated Branches: refs/heads/CURATOR-160 3cbc3afd6 -> 5e6cd0e91 Moved EnsembleTracker and did some refactoring Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5e6cd0e9 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5e6cd0e9 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5e6cd0e9 Branch: refs/heads/CURATOR-160 Commit: 5e6cd0e9137297a264c34339f52dd737d50a8982 Parents: 3cbc3af Author: randgalt Authored: Sat May 9 09:47:04 2015 -0500 Committer: randgalt Committed: Sat May 9 09:47:04 2015 -0500 ---------------------------------------------------------------------- .../framework/ensemble/EnsembleTracker.java | 191 ++++++++++++++++++ .../curator/framework/imps/EnsembleTracker.java | 196 ------------------- .../framework/imps/TestReconfiguration.java | 1 + 3 files changed, 192 insertions(+), 196 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/5e6cd0e9/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java new file mode 100644 index 0000000..375e1f0 --- /dev/null +++ b/curator-framework/src/main/java/org/apache/curator/framework/ensemble/EnsembleTracker.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.curator.framework.ensemble; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import org.apache.curator.ensemble.EnsembleListener; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorWatcher; +import org.apache.curator.framework.listen.ListenerContainer; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tracks changes to the ensemble and notifies registered {@link org.apache.curator.ensemble.EnsembleListener} instances. + */ +public class EnsembleTracker implements Closeable +{ + private final Logger log = LoggerFactory.getLogger(getClass()); + private final CuratorFramework client; + private final AtomicReference state = new AtomicReference<>(State.LATENT); + private final ListenerContainer listeners = new ListenerContainer<>(); + private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) ) + { + try + { + reset(); + } + catch ( Exception e ) + { + log.error("Trying to reset after reconnection", e); + } + } + } + }; + + private final CuratorWatcher watcher = new CuratorWatcher() + { + @Override + public void process(WatchedEvent event) throws Exception + { + if ( event.getType() == Watcher.Event.EventType.NodeDataChanged ) + { + reset(); + } + } + }; + + private enum State + { + LATENT, + STARTED, + CLOSED + } + + private final BackgroundCallback backgroundCallback = new BackgroundCallback() + { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception + { + processBackgroundResult(event); + } + }; + + public EnsembleTracker(CuratorFramework client) + { + this.client = client; + } + + public void start() throws Exception + { + Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); + client.getConnectionStateListenable().addListener(connectionStateListener); + reset(); + } + + @Override + public void close() throws IOException + { + if ( state.compareAndSet(State.STARTED, State.CLOSED) ) + { + listeners.clear(); + } + client.getConnectionStateListenable().removeListener(connectionStateListener); + } + + /** + * Return the ensemble listenable + * + * @return listenable + */ + public ListenerContainer getListenable() + { + Preconditions.checkState(state.get() != State.CLOSED, "Closed"); + + return listeners; + } + + private void reset() throws Exception + { + client.getConfig().usingWatcher(watcher).inBackground(backgroundCallback).forEnsemble(); + } + + private void processBackgroundResult(CuratorEvent event) throws Exception + { + switch ( event.getType() ) + { + case GET_CONFIG: + { + if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) + { + processConfigData(event.getData()); + } + } + } + } + + private void processConfigData(byte[] data) throws Exception + { + Properties properties = new Properties(); + properties.load(new ByteArrayInputStream(data)); + QuorumVerifier qv = new QuorumMaj(properties); + StringBuilder sb = new StringBuilder(); + for ( QuorumPeer.QuorumServer server : qv.getAllMembers().values() ) + { + if ( sb.length() != 0 ) + { + sb.append(","); + } + sb.append(server.clientAddr.getAddress().getHostAddress()).append(":").append(server.clientAddr.getPort()); + } + + final String connectionString = sb.toString(); + listeners.forEach + ( + new Function() + { + @Override + public Void apply(EnsembleListener listener) + { + try + { + listener.connectionStringUpdated(connectionString); + } + catch ( Exception e ) + { + log.error("Calling listener", e); + } + return null; + } + } + ); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/5e6cd0e9/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java deleted file mode 100644 index 6688848..0000000 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/EnsembleTracker.java +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.curator.framework.imps; - -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import org.apache.curator.ensemble.EnsembleListener; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.api.BackgroundCallback; -import org.apache.curator.framework.api.CuratorEvent; -import org.apache.curator.framework.api.CuratorWatcher; -import org.apache.curator.framework.listen.ListenerContainer; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.server.quorum.QuorumPeer; -import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; -import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; -import java.io.Closeable; -import java.io.IOException; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Tracks changes to the ensemble and notifies registered {@link org.apache.curator.ensemble.EnsembleListener} instances. - */ -public class EnsembleTracker implements Closeable -{ - private final Logger log = LoggerFactory.getLogger(getClass()); - private final CuratorFramework client; - private final AtomicReference state = new AtomicReference(State.LATENT); - private final ListenerContainer listeners = new ListenerContainer(); - private final AtomicBoolean isConnected = new AtomicBoolean(true); - private final ConnectionStateListener connectionStateListener = new ConnectionStateListener() - { - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) - { - if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) ) - { - if ( isConnected.compareAndSet(false, true) ) - { - try - { - reset(); - } - catch ( Exception e ) - { - log.error("Trying to reset after reconnection", e); - } - } - } - else - { - isConnected.set(false); - } - } - }; - - private final CuratorWatcher watcher = new CuratorWatcher() - { - @Override - public void process(WatchedEvent event) throws Exception - { - reset(); - } - }; - - private enum State - { - LATENT, - STARTED, - CLOSED - } - - private final BackgroundCallback backgroundCallback = new BackgroundCallback() - { - @Override - public void processResult(CuratorFramework client, CuratorEvent event) throws Exception - { - processBackgroundResult(event); - } - }; - - public EnsembleTracker(CuratorFramework client) - { - this.client = client; - } - - public void start() throws Exception - { - Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); - client.getConnectionStateListenable().addListener(connectionStateListener); - reset(); - } - - @Override - public void close() throws IOException - { - if ( state.compareAndSet(State.STARTED, State.CLOSED) ) - { - listeners.clear(); - } - client.getConnectionStateListenable().removeListener(connectionStateListener); - } - - /** - * Return the ensemble listenable - * - * @return listenable - */ - public ListenerContainer getListenable() - { - Preconditions.checkState(state.get() != State.CLOSED, "Closed"); - - return listeners; - } - - private void reset() throws Exception - { - client.getConfig().usingWatcher(watcher).inBackground(backgroundCallback).forEnsemble(); - } - - private void processBackgroundResult(CuratorEvent event) throws Exception - { - switch ( event.getType() ) - { - case GET_CONFIG: - { - if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) - { - processConfigData(event.getData()); - } - } - } - } - - private void processConfigData(byte[] data) throws Exception - { - Properties properties = new Properties(); - properties.load(new ByteArrayInputStream(data)); - QuorumVerifier qv = new QuorumMaj(properties); - StringBuilder sb = new StringBuilder(); - for ( QuorumPeer.QuorumServer server : qv.getAllMembers().values() ) - { - if ( sb.length() != 0 ) - { - sb.append(","); - } - sb.append(server.clientAddr.getAddress().getHostAddress()).append(":").append(server.clientAddr.getPort()); - } - - final String connectionString = sb.toString(); - listeners.forEach - ( - new Function() - { - @Override - public Void apply(EnsembleListener listener) - { - try - { - listener.connectionStringUpdated(connectionString); - } - catch ( Exception e ) - { - log.error("Calling listener", e); - } - return null; - } - } - ); - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/5e6cd0e9/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java index 297cf9b..133e690 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReconfiguration.java @@ -25,6 +25,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.ensemble.EnsembleTracker; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingCluster;