curator-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Cantrell, Curtis" <Curtis.Cantr...@bkfs.com>
Subject RE: Help with SharedValue FailOver
Date Tue, 19 Jul 2016 19:36:28 GMT
package com.bkfs.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.CloseableUtils;

import com.lps.mg.log.DeferLogger;
import com.lps.mg.utils.MessageResources;
import com.lps.mg.utils.StringUtils;

public class CuratorClient implements ConnectionStateListener {

      /** Log4J Logger for this class     */
      private final static DeferLogger logger = DeferLogger.getLogger(CuratorClient.class);
      private static DeferLogger alertLog = DeferLogger.getLogger("SpectrumAlert");

      protected static MessageResources messages = MessageResources.getInstance();

      /** ZooKeeper properties in the config.properties file*/
      protected static final String ZOOKEEPER_CONN = "zookeeper.connection.string";

      protected static final String CONFIG         = "config";

      /** An internal reference to this singleton */
      private static CuratorClient instance = null;


      /** The connection String that Curator uses to connect to zookeeper. (e.g. 10.48.136.128:2181)
*/
      private String connectionString = null;

      /** The Zookeeper connection */
      private CuratorFramework client = null;

      protected static int SESSION_TIMEOUT = 30000;
      protected static int CONNECTION_TIMEOUT = 15000;
      protected static int RETRY_INTERVAL = 2000;

      /** Have I been Closed */
      private boolean closed = false;

      /**
      * Private Constructor for Singleton
      * @throws Exception if no Connection could be established with zookeeper after 15 seconds.
       */
      protected CuratorClient() throws RuntimeException  {
            connectClient();
      }


      private void connectClient() throws RuntimeException {
            try {
                  if (connectionString == null) {
                        connectionString = messages.getRawMessage(CONFIG, ZOOKEEPER_CONN);
                  }

                  if (StringUtils.isEmpty(connectionString))      {
                        logger.error("<Fatal> Error zookeeper.connection.string property
not defined in config properties file");
                        alertLog.error("<Fatal> Error zookeeper.connection.string property
not defined in config properties file");
                  }

                  client = CuratorFrameworkFactory.newClient(connectionString, SESSION_TIMEOUT,
CONNECTION_TIMEOUT, new RetryForeverPolicy(60000,60000,RETRY_INTERVAL));
                  client.getConnectionStateListenable().addListener(this);
                  client.start();

                  //hold out here until we are connected
                  client.getZookeeperClient().blockUntilConnectedOrTimedOut();
                  if (client.getState() != CuratorFrameworkState.STARTED) {
                        logger.error("<Fatal> Could not establish a connection to zookeeper
after Connection Timeout: " + CONNECTION_TIMEOUT);
                        throw new Exception("zookeeper startup timed out");
                  }

                  if (logger.isDebugEnabled()) logger.debug("Successfully Establised a Connection
with Zookeeper.");

                  closed = false;

            } catch (Throwable t) {
                  logger.error("Problem Creating Curator Client", t);
                  throw new RuntimeException("Problem Creating Connection with Zookeeper",
t);
            }
      }



      public CuratorFramework getClient() {
            if (!closed) {
                  return client;
            } else {
                  connectClient();
                  return client;
            }
      }

      /**
      * Should Only be Called by the Container when the Application is Shutdown
      */
      public void Close() {
            CloseableUtils.closeQuietly(client);
            client = null;
            closed = true;
            logger.info("Closed....");
      }



      /**
      * Gets access to the Singleton Instance.
      * @return The CuratorClient
      * @throws Exception if the Client cannot be created
      */
      public static CuratorClient getInstance()  {

            if (instance == null) {

                  synchronized(logger) {

                        if (instance == null) {

                              //Use a temporary pool variable and set it at last to the static
"pool" object to avoid thread contention.
                              instance = new CuratorClient();
                              if (logger.isDebugEnabled())  logger.debug("Created new Instace
of the CuratorClient that is used by the application");
                        }
                  }

            }
            return instance;
      }


      @Override
      public void stateChanged(CuratorFramework client, ConnectionState newState) {

            if (newState == ConnectionState.CONNECTED) {
                  logger.info("Zookeeper Connection State: CONNECTED");
            }
            else if (newState == ConnectionState.RECONNECTED) {

                  logger.info("Zookeeper Connection State: RECONNECTED");
            }
            else if (newState == ConnectionState.SUSPENDED) {
                  logger.info("Zookeeper Connection State: SUSPENDED");
            }
            else if (newState == ConnectionState.LOST) {
                  logger.error("Zookeeper Connection State: LOST");
            }
      }


}


From: Jordan Zimmerman [mailto:jordan@jordanzimmerman.com]
Sent: Tuesday, July 19, 2016 3:13 PM
To: user@curator.apache.org
Subject: Re: Help with SharedValue FailOver

Can you post the code for CuratorClient.getInstance()?

On Jul 19, 2016, at 2:04 PM, Cantrell, Curtis <Curtis.Cantrell@bkfs.com<mailto:Curtis.Cantrell@bkfs.com>>
wrote:

I’m a little lost with using the SharedValue.     I have upgraded to Curator 3.2.0.

When I start the class, the sharedValue connects to ZK and reads the value.    If I update
the value from another client, it is read ok.   But I cannot get the SharedValue to failover
to another zk server.

I have registered a ValueChangeListener, and when I kill the zookeeper that it is connected
to, I can see the stateChanged is invoked and “Connection Iteruption” is logged.

I think my biggest question is Why do I never get a RECONNECTED.  Nor will the sharedValue
receive updates from zookeeper anylonger.

The background EventThread simple is hung retrying…..   (This is my own RetryForeverPolicy
so I can log inside of it)

Here is the log output…

<Jul 19, 2016 14:50:55 PM EDT><INFO><Curator-ConnectionStateManager-0> com.bkfs.curator.CuratorClient
: Zookeeper Connection State: SUSPENDED
<Jul 19, 2016 14:50:55 PM EDT><DEBUG><Curator-ConnectionStateManager-0>
com.bkfs.mg.rules.TestSharedValue : Connection Interuption
<Jul 19, 2016 14:51:25 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 0, ElapsedTimeMs: 37
<Jul 19, 2016 14:51:27 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 1, ElapsedTimeMs: 2038
<Jul 19, 2016 14:51:29 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 2, ElapsedTimeMs: 4038
<Jul 19, 2016 14:51:31 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 3, ElapsedTimeMs: 6076
<Jul 19, 2016 14:51:33 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 4, ElapsedTimeMs: 8047
<Jul 19, 2016 14:51:35 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 5, ElapsedTimeMs: 10107
<Jul 19, 2016 14:51:37 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 6, ElapsedTimeMs: 12078
<Jul 19, 2016 14:51:39 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 7, ElapsedTimeMs: 14139

Here is my test code..

package com.bkfs.mg.rules;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.shared.SharedValue;
import org.apache.curator.framework.recipes.shared.SharedValueListener;
import org.apache.curator.framework.recipes.shared.SharedValueReader;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.utils.CloseableUtils;
import com.bkfs.curator.CuratorClient;
import com.lps.mg.log.DeferLogger;


public class TestSharedValue  {

      private final static DeferLogger logger = DeferLogger.getLogger(TestSharedValue.class);

      private SharedValue sharedUpdateValue;

      private static final String SHARED_PATH = "/sharedValues/enginePool/lastUpdate";

      private long lastUpdated = 0;

      public void close() {
            CloseableUtils.closeQuietly(sharedUpdateValue);
            sharedUpdateValue = null;
      }

      public TestSharedValue() {

            CuratorFramework client = CuratorClient.getInstance().getClient();

            sharedUpdateValue = new SharedValue(client, SHARED_PATH, "0".toString().getBytes());
            sharedUpdateValue.getListenable().addListener(new ValueChangedListener());

            try {
                  sharedUpdateValue.start();
            } catch (Exception e) {
                  logger.error("There was a problem Starting the sharedValue after a Reconnect.",e);
            }

            String value = new String(sharedUpdateValue.getValue());
            lastUpdated = Long.valueOf(value);
      }

      public long getLastUpdated() {
            return lastUpdated;
      }

      public class ValueChangedListener implements SharedValueListener {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {

                  if ( client.getConnectionStateErrorPolicy().isErrorState(newState) )   {
                        logger.debug("Connection Interuption");
                  } else
                  if (newState == ConnectionState.RECONNECTED) {
                        logger.debug("Connection Restored");
                  }
            }

            @Override
            public void valueHasChanged(SharedValueReader valueReader, byte[] newValue)  
   throws Exception {

                  String value = new String(newValue);
                  long newLong = new Long(value).longValue();

            if (getLastUpdated() != newLong) {
                  TestSharedValue.this.lastUpdated = newLong;
            }
            }
      }
}

Thank you,
Curtis



The information contained in this message is proprietary and/or confidential. If you are not
the intended recipient, please: (i) delete the message and all copies; (ii) do not disclose,
distribute or use the message in any manner; and (iii) notify the sender immediately. In addition,
please be aware that any message addressed to our domain is subject to archiving and review
by persons other than the intended recipient. Thank you.

The information contained in this message is proprietary and/or confidential. If you are not
the intended recipient, please: (i) delete the message and all copies; (ii) do not disclose,
distribute or use the message in any manner; and (iii) notify the sender immediately. In addition,
please be aware that any message addressed to our domain is subject to archiving and review
by persons other than the intended recipient. Thank you.
Mime
View raw message