curator-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Cantrell, Curtis" <Curtis.Cantr...@bkfs.com>
Subject RE: Help with SharedValue FailOver
Date Tue, 19 Jul 2016 20:32:05 GMT
When I started my client code in the application, it established a connection with zk server
3

2016-07-19 16:17:15,034 [myid:3] - INFO  [CommitProcWorkThread-1:ZooKeeperServer@678] - Established
session 0x30010a1e7150000 with negotiated timeout 30000 for client /127.0.0.1:60293

The zk Leader is server 2..  So I see the NodeExist Exception as expected on server 2.

2016-07-19 16:17:15,086 [myid:2] - INFO  [ProcessThread(sid:2 cport:-1)::PrepRequestProcessor@843]
- Got user-level KeeperException when processing sessionid:0x30010a1e7150000 type:create2
cxid:0x2 zxid:0x250000000b txntype:-1 reqpath:n/a Error Path:/sharedValues/enginePool/lastUpdate
Error:KeeperErrorCode = NodeExists for /sharedValues/enginePool/lastUpdate

Now, I am going to kill server 3 where the connection is established.   I will expect that
the connection will be created on either server 1 or 2 and the session will be reestablished.

When I kill server 3….

The client APPLICATION logged.

<Jul 19, 2016 16:21:20 PM EDT><DEBUG><server.startup : 2-EventThread> com.bkfs.mg.rules.TestSharedValue
: Value Changed
<Jul 19, 2016 16:21:20 PM EDT><DEBUG><Curator-ConnectionStateManager-0>
com.bkfs.mg.rules.TestSharedValue : Connection Interuption
<Jul 19, 2016 16:21:20 PM EDT><INFO><Curator-ConnectionStateManager-0> com.bkfs.curator.CuratorClient
: Zookeeper Connection State: SUSPENDED
<Jul 19, 2016 16:21:35 PM EDT><DEBUG><server.startup : 2-EventThread> com.bkfs.mg.rules.TestSharedValue
: Value Changed
Then zk server 2 accepted the connection

07-19 16:21:20,713 [myid:2] - INFO  [NIOServerCxnFactory.AcceptThread:0.0.0.0/0.0.0.0:2183:NIOServerCnxnFactory$AcceptThread@296]
- Accepted socket connection from /127.0.0.1:60305
07-19 16:21:20,715 [myid:2] - INFO  [NIOWorkerThread-10:ZooKeeperServer@969] - Client attempting
to renew session 0x30010a1e7150000 at /127.0.0.1:60305
07-19 16:21:20,717 [myid:2] - INFO  [NIOWorkerThread-10:ZooKeeperServer@678] - Established
session 0x30010a1e7150000 with negotiated timeout 30000 for client /127.0.0.1:60305
07-19 16:21:50,732 [myid:2] - WARN  [NIOWorkerThread-13:NIOServerCnxn@365] - Unable to read
additional data from client sessionid 0x30010a1e7150000, likely client has closed socket
07-19 16:21:50,733 [myid:2] - INFO  [NIOWorkerThread-13:MBeanRegistry@119] - Unregister MBean
[org.apache.ZooKeeperService:name0=ReplicatedServer_id2,name1=replica.2,name2=Leader,name3=Connections,name4=127.0.0.1,name5=0x30010a1e715000

Then on the APPLICATION client, after a 25 second pause….  The EventThread is stuff for
ever…

<Jul 19, 2016 16:21:50 PM EDT><DEBUG><server.startup : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 0, ElapsedTimeMs: 51
<Jul 19, 2016 16:21:52 PM EDT><DEBUG><server.startup : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 1, ElapsedTimeMs: 2093
<Jul 19, 2016 16:21:54 PM EDT><DEBUG><server.startup : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 2, ElapsedTimeMs: 4064
<Jul 19, 2016 16:21:56 PM EDT><DEBUG><server.startup : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 3, ElapsedTimeMs: 6064
<Jul 19, 2016 16:21:58 PM EDT><DEBUG><server.startup : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 4, ElapsedTimeMs: 8108
<Jul 19, 2016 16:22:00 PM EDT><DEBUG><server.startup : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 5, ElapsedTimeMs: 10080
<Jul 19, 2016 16:22:02 PM EDT><DEBUG><server.startup : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 6, ElapsedTimeMs: 12111
<Jul 19, 2016 16:22:04 PM EDT><DEBUG><server.startup : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 7, ElapsedTimeMs: 14082
<Jul 19, 2016 16:22:06 PM EDT><DEBUG><server.startup : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 8, ElapsedTimeMs: 16125
Of course, I can see from zk, it appears that Curator closed the connection right after establishing
it on zk server 2.

Curtis


From: Cantrell, Curtis [mailto:Curtis.Cantrell@bkfs.com]
Sent: Tuesday, July 19, 2016 4:02 PM
To: user@curator.apache.org
Subject: RE: Help with SharedValue FailOver

No, all three of the zk servers are being passed in the connection string.    I’m having
not trouble with my LeaderSelector failing over properly.   But when I added a simple SharedValue,
the EventThread goes crazy.   That is why I created this simple Test class.   I have no idea
what I’m doing wrong with this SharedValue.

Curator 3.2.0, ZK 3.5.1-alpha     I am moving the CuratorFramework client to a single connection
per application, like you told me to earlier.    So I have been doing some refactoring.

Unfortunately, the SharedValue will not work.   At least, I cannot get it to work.

I guess tomorrow, I will try to roll back a few versions in Curator unless someone has a better
idea.

Thank you,
Curtis


From: Jordan Zimmerman [mailto:jordan@jordanzimmerman.com]
Sent: Tuesday, July 19, 2016 3:41 PM
To: user@curator.apache.org<mailto:user@curator.apache.org>
Subject: Re: Help with SharedValue FailOver

My guess is that you’re only passing one of the ZK instances in the connection string. You
should be passing them all. A typical connection string is:

            “10.2.3.4:2181,10.2.3.5:2181,10.2.3.6:2181"

On Jul 19, 2016, at 2:36 PM, Cantrell, Curtis <Curtis.Cantrell@bkfs.com<mailto:Curtis.Cantrell@bkfs.com>>
wrote:

package com.bkfs.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.CloseableUtils;

import com.lps.mg.log.DeferLogger;
import com.lps.mg.utils.MessageResources;
import com.lps.mg.utils.StringUtils;

public class CuratorClient implements ConnectionStateListener {

      /** Log4J Logger for this class     */
      private final static DeferLogger logger = DeferLogger.getLogger(CuratorClient.class);
      private static DeferLogger alertLog = DeferLogger.getLogger("SpectrumAlert");

      protected static MessageResources messages = MessageResources.getInstance();

      /** ZooKeeper properties in the config.properties file*/
      protected static final String ZOOKEEPER_CONN = "zookeeper.connection.string";

      protected static final String CONFIG         = "config";

      /** An internal reference to this singleton */
      private static CuratorClient instance = null;


      /** The connection String that Curator uses to connect to zookeeper. (e.g. 10.48.136.128:2181)
*/
      private String connectionString = null;

      /** The Zookeeper connection */
      private CuratorFramework client = null;

      protected static int SESSION_TIMEOUT = 30000;
      protected static int CONNECTION_TIMEOUT = 15000;
      protected static int RETRY_INTERVAL = 2000;

      /** Have I been Closed */
      private boolean closed = false;

      /**
      * Private Constructor for Singleton
      * @throws Exception if no Connection could be established with zookeeper after 15 seconds.
       */
      protected CuratorClient() throws RuntimeException  {
            connectClient();
      }


      private void connectClient() throws RuntimeException {
            try {
                  if (connectionString == null) {
                        connectionString = messages.getRawMessage(CONFIG, ZOOKEEPER_CONN);
                  }

                  if (StringUtils.isEmpty(connectionString))      {
                        logger.error("<Fatal> Error zookeeper.connection.string property
not defined in config properties file");
                        alertLog.error("<Fatal> Error zookeeper.connection.string property
not defined in config properties file");
                  }

                  client = CuratorFrameworkFactory.newClient(connectionString, SESSION_TIMEOUT,
CONNECTION_TIMEOUT, newRetryForeverPolicy(60000,60000,RETRY_INTERVAL));
                  client.getConnectionStateListenable().addListener(this);
                  client.start();

                  //hold out here until we are connected
                  client.getZookeeperClient().blockUntilConnectedOrTimedOut();
                  if (client.getState() != CuratorFrameworkState.STARTED) {
                        logger.error("<Fatal> Could not establish a connection to zookeeper
after Connection Timeout: " +CONNECTION_TIMEOUT);
                        throw new Exception("zookeeper startup timed out");
                  }

                  if (logger.isDebugEnabled()) logger.debug("Successfully Establised a Connection
with Zookeeper.");

                  closed = false;

            } catch (Throwable t) {
                  logger.error("Problem Creating Curator Client", t);
                  throw new RuntimeException("Problem Creating Connection with Zookeeper",
t);
            }
      }



      public CuratorFramework getClient() {
            if (!closed) {
                  return client;
            } else {
                  connectClient();
                  return client;
            }
      }

      /**
      * Should Only be Called by the Container when the Application is Shutdown
      */
      public void Close() {
            CloseableUtils.closeQuietly(client);
            client = null;
            closed = true;
            logger<http://logger.info/>.info<http://logger.info/>("Closed....");
      }



      /**
      * Gets access to the Singleton Instance.
      * @return The CuratorClient
      * @throws Exception if the Client cannot be created
      */
      public static CuratorClient getInstance()  {

            if (instance == null) {

                  synchronized(logger) {

                        if (instance == null) {

                              //Use a temporary pool variable and set it at last to the static
"pool" object to avoid thread contention.
                              instance = new CuratorClient();
                              if (logger.isDebugEnabled())  logger.debug("Created new Instace
of the CuratorClient that is used by the application");
                        }
                  }

            }
            return instance;
      }


      @Override
      public void stateChanged(CuratorFramework client, ConnectionState newState) {

            if (newState == ConnectionState.CONNECTED) {
                  logger<http://logger.info/>.info<http://logger.info/>("Zookeeper
Connection State: CONNECTED");
            }
            else if (newState == ConnectionState.RECONNECTED) {

                  logger<http://logger.info/>.info<http://logger.info/>("Zookeeper
Connection State: RECONNECTED");
            }
            else if (newState == ConnectionState.SUSPENDED) {
                  logger<http://logger.info/>.info<http://logger.info/>("Zookeeper
Connection State: SUSPENDED");
            }
            else if (newState == ConnectionState.LOST) {
                  logger.error("Zookeeper Connection State: LOST");
            }
      }


}


From: Jordan Zimmerman [mailto:jordan@jordanzimmerman.com]
Sent: Tuesday, July 19, 2016 3:13 PM
To: user@curator.apache.org<mailto:user@curator.apache.org>
Subject: Re: Help with SharedValue FailOver

Can you post the code for CuratorClient.getInstance()?

On Jul 19, 2016, at 2:04 PM, Cantrell, Curtis <Curtis.Cantrell@bkfs.com<mailto:Curtis.Cantrell@bkfs.com>>
wrote:

I’m a little lost with using the SharedValue.     I have upgraded to Curator 3.2.0.

When I start the class, the sharedValue connects to ZK and reads the value.    If I update
the value from another client, it is read ok.   But I cannot get the SharedValue to failover
to another zk server.

I have registered a ValueChangeListener, and when I kill the zookeeper that it is connected
to, I can see the stateChanged is invoked and “Connection Iteruption” is logged.

I think my biggest question is Why do I never get a RECONNECTED.  Nor will the sharedValue
receive updates from zookeeper anylonger.

The background EventThread simple is hung retrying…..   (This is my own RetryForeverPolicy
so I can log inside of it)

Here is the log output…

<Jul 19, 2016 14:50:55 PM EDT><INFO><Curator-ConnectionStateManager-0> com.bkfs.curator.CuratorClient
: Zookeeper Connection State: SUSPENDED
<Jul 19, 2016 14:50:55 PM EDT><DEBUG><Curator-ConnectionStateManager-0>
com.bkfs.mg.rules.TestSharedValue : Connection Interuption
<Jul 19, 2016 14:51:25 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 0, ElapsedTimeMs: 37
<Jul 19, 2016 14:51:27 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 1, ElapsedTimeMs: 2038
<Jul 19, 2016 14:51:29 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 2, ElapsedTimeMs: 4038
<Jul 19, 2016 14:51:31 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 3, ElapsedTimeMs: 6076
<Jul 19, 2016 14:51:33 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 4, ElapsedTimeMs: 8047
<Jul 19, 2016 14:51:35 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 5, ElapsedTimeMs: 10107
<Jul 19, 2016 14:51:37 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 6, ElapsedTimeMs: 12078
<Jul 19, 2016 14:51:39 PM EDT><DEBUG><Default : 2-EventThread> com.bkfs.curator.RetryForeverPolicy
: Retrying Again. Count: 7, ElapsedTimeMs: 14139

Here is my test code..

package com.bkfs.mg.rules;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.shared.SharedValue;
import org.apache.curator.framework.recipes.shared.SharedValueListener;
import org.apache.curator.framework.recipes.shared.SharedValueReader;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.utils.CloseableUtils;
import com.bkfs.curator.CuratorClient;
import com.lps.mg.log.DeferLogger;


public class TestSharedValue  {

      private final static DeferLogger logger = DeferLogger.getLogger(TestSharedValue.class);

      private SharedValue sharedUpdateValue;

      private static final String SHARED_PATH = "/sharedValues/enginePool/lastUpdate";

      private long lastUpdated = 0;

      public void close() {
            CloseableUtils.closeQuietly(sharedUpdateValue);
            sharedUpdateValue = null;
      }

      public TestSharedValue() {

            CuratorFramework client = CuratorClient.getInstance().getClient();

            sharedUpdateValue = new SharedValue(client, SHARED_PATH, "0".toString().getBytes());
            sharedUpdateValue.getListenable().addListener(new ValueChangedListener());

            try {
                  sharedUpdateValue.start();
            } catch (Exception e) {
                  logger.error("There was a problem Starting the sharedValue after a Reconnect.",e);
            }

            String value = new String(sharedUpdateValue.getValue());
            lastUpdated = Long.valueOf(value);
      }

      public long getLastUpdated() {
            return lastUpdated;
      }

      public class ValueChangedListener implements SharedValueListener {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {

                  if ( client.getConnectionStateErrorPolicy().isErrorState(newState) )   {
                        logger.debug("Connection Interuption");
                  } else
                  if (newState == ConnectionState.RECONNECTED) {
                        logger.debug("Connection Restored");
                  }
            }

            @Override
            public void valueHasChanged(SharedValueReader valueReader, byte[] newValue)  
   throws Exception {

                  String value = new String(newValue);
                  long newLong = new Long(value).longValue();

            if (getLastUpdated() != newLong) {
                  TestSharedValue.this.lastUpdated = newLong;
            }
            }
      }
}

Thank you,
Curtis



The information contained in this message is proprietary and/or confidential. If you are not
the intended recipient, please: (i) delete the message and all copies; (ii) do not disclose,
distribute or use the message in any manner; and (iii) notify the sender immediately. In addition,
please be aware that any message addressed to our domain is subject to archiving and review
by persons other than the intended recipient. Thank you.

The information contained in this message is proprietary and/or confidential. If you are not
the intended recipient, please: (i) delete the message and all copies; (ii) do not disclose,
distribute or use the message in any manner; and (iii) notify the sender immediately. In addition,
please be aware that any message addressed to our domain is subject to archiving and review
by persons other than the intended recipient. Thank you.

The information contained in this message is proprietary and/or confidential. If you are not
the intended recipient, please: (i) delete the message and all copies; (ii) do not disclose,
distribute or use the message in any manner; and (iii) notify the sender immediately. In addition,
please be aware that any message addressed to our domain is subject to archiving and review
by persons other than the intended recipient. Thank you.
The information contained in this message is proprietary and/or confidential. If you are not
the intended recipient, please: (i) delete the message and all copies; (ii) do not disclose,
distribute or use the message in any manner; and (iii) notify the sender immediately. In addition,
please be aware that any message addressed to our domain is subject to archiving and review
by persons other than the intended recipient. Thank you.
Mime
View raw message