curator-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Cantrell, Curtis" <Curtis.Cantr...@bkfs.com>
Subject Problem with LeaderSelector 2.7.1
Date Wed, 13 Jul 2016 12:57:32 GMT
I need some help.   I think that either I am not handling SUSPEND or LOST Connection right
or there is something strange going on.

For every LeaderSelector, there are 1000's of locks in zookeeper.

Even though we have less 10 zNodes, the snapshots on some of the servers are over 250 meg.

This the Session Details

Session Details (sid, timeout, ephemeralCount):
0x355dcf893490000, 15000, 277505
0x255ddebc89b0000, 40000, 0
0x355dcf893490001, 15000, 291456
0x355dcf893490002, 40000, 0
0x455dcf898640001, 15000, 119078
0x155dad98ce60003, 40000, 0
0x455dcf898640002, 15000, 107271
0x155dcf893590000, 15000, 432353
0x255dcf893390000, 15000, 131946
0x155dcf893590003, 40000, 0
0x155de3d21710001, 15000, 177958
0x255dadcacd40005, 40000, 0
0x155de3d21710000, 15000, 69169
0x555dcf90d300001, 40000, 0
0x555dcf90d300003, 40000, 0


Here is a sample of the contents in the SnapShot.

/leaders/queue_monitor/_c_64963b25-d6bf-44e7-b9b4-f3dace29eda5-lock-0005238285
  cZxid = 0x00015400171ad3
  ctime = Tue Jul 12 01:53:23 EDT 2016
  mZxid = 0x00015400171ad3
  mtime = Tue Jul 12 01:53:23 EDT 2016
  pZxid = 0x00015400171ad3
  cversion = 0
  dataVersion = 0
  aclVersion = 0
  ephemeralOwner = 0x355dcf893490000
  dataLength = 11
----
/leaders/queue_monitor/_c_ef509190-f429-4a43-b391-3d0c0e3ed12f-lock-0005416045
  cZxid = 0x0001540020bb67
  ctime = Tue Jul 12 03:05:52 EDT 2016
  mZxid = 0x0001540020bb67
  mtime = Tue Jul 12 03:05:52 EDT 2016
  pZxid = 0x0001540020bb67
  cversion = 0
  dataVersion = 0
  aclVersion = 0
  ephemeralOwner = 0x355dcf893490000
  dataLength = 11
----
/leaders/queue_monitor/_c_1b80457b-5c17-4732-8605-27252a1bb4ad-lock-0005286264
  cZxid = 0x00015400195658
  ctime = Tue Jul 12 02:12:59 EDT 2016
  mZxid = 0x00015400195658
  mtime = Tue Jul 12 02:12:59 EDT 2016
  pZxid = 0x00015400195658
  cversion = 0
  dataVersion = 0
  aclVersion = 0
  ephemeralOwner = 0x355dcf893490000
  dataLength = 11
----
  mZxid = 0x0001540020bb62
  mtime = Tue Jul 12 03:05:52 EDT 2016
  pZxid = 0x0001540020bb62
  cversion = 0
  dataVersion = 0
  aclVersion = 0
  ephemeralOwner = 0x355dcf893490000
  dataLength = 11


We only have 5 servers, so there should only be 5 leaders queued up, but there are 100's of
1000's.

I am using Curator 2.7.1 and zookeeper 3.4.6

When the Server Starts up, start() is called, creating a curator framework client and then
starts the leaderSelector.

When takeLeadership is called, it repeatedly calls a DAO to monitor some internal system queue
depths.
If there is a problem, a logger sends an error message.  This process continues while the
client is connected.

The connected state is monitored and changed within stateChange.

If the StateChange is a SUSPENDED, the class is alerted that we are no longer connected, and
the takeLeadership will return.  This results in giving up leadership.
If the StateChange is a LOST, the close() method is called, which sets the leaderSelector
is closed and set to null.
If the StateChange is a RECONNECTED, and the LOST had already destroyed the leaderSelector,
then a new leaderSelector is created and queued up.   ((I am suspecting this is the logic
that is causing the problem.. But I don't know if there is a right way to handle the LOST
and RECONNECT.))
If the StateChange is a RECONNECTED, and the SUSPEND had already released leadership by setting
isConnnected to false, the isConnected is set to true;

Shutdown is called when the Application or Serer is shutting down.

I have removed a few methods that are not needed for this question...


//----------------------------------------------
Here is the code...
//----------------------------------------------

import java.util.ArrayList;
import java.util.List;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.CloseableUtils;

import com.lps.mg.cache.MGCache;
import com.lps.mg.cache.MGCacheFactory;
import com.lps.mg.dao.ConsumerDAO;
import com.lps.mg.log.DeferLogger;
import com.lps.mg.utils.MessageResources;
import com.lps.mg.utils.StringUtils;

public class QueueMonitorLeader extends LeaderSelectorListenerAdapter implements ConnectionStateListener
{

                private static DeferLogger logger = DeferLogger.getLogger(QueueMonitorLeader.class);
                private static DeferLogger alertLog = DeferLogger.getLogger("SpectrumAlert");

                /** The curator / zookeeper object that adds me as begin in the queue to take
leadership */
                private LeaderSelector leaderSelector = null;

                /** Flag to indicated we are shutting down due to a zookeeper Connection Lose
*/
                private boolean shuttingDownZk = false;

                /** The default monitoring interval */
                private int defaultInterval = 5;

                /** The default monitoring limit */
                private int defatulLimit = 100;

                private static final String ERR_610_0083_0001 = "610-0083-0001";
                private static final String ERR_610_0083_0002 = "610-0083-0002";

                protected static MessageResources messages                   = MessageResources.getInstance();
                protected static final String bundle                                     
    = "mgmessages";

                /** flag to indicate the leader has been closed */
                boolean isClosed = false;

                /** The thread that invoked takeLeadership */
                Thread leadershipThread = null;

                /** Flag to indicated there is a connection to the zookeeper server */
                private boolean connected = true;

                /** Used for unit Testing */
                private boolean hasLeadership = false;

                //ZooKeeper Connection String
                protected static final String ZOOKEEPER_CONN = "zookeeper.connection.string";

                /** Config Property Group and Names for the Queue Monitoring */
                protected static final String PROP_GROUP_QUEUE                           
              = "Consumer_Queues";
                /** The positive integer interval in which the Consumer queues will be checked
in minutes */
                protected static final String PROP_VALUE_INTERVAL                      = "Check_Interval";
                /** The positive integer upper limit of the depth of a queue before a monitor
alert is sent */
                protected static final String PROP_VALUE_LIMIT                           
                   = "Alert_Threshold";

                /** The zookeeper connection */
                private CuratorFramework client = null;

                /** The path to the mutex lock where this leader is watching in zookeeper
*/
                protected static final String zkNodePath = "/leaders/queue_monitor";

                /** Reference to the System Cache */
                private static MGCache mgCache = MGCacheFactory.getMGCacheImpl();

                /** The connection String that curator uses to connect to zookeeper. (e.g.
10.48.136.128:2181) */
                String connectionString = null;

                public QueueMonitorLeader() { }

                public void start() {
                                if (connectionString == null) {
                                                connectionString = messages.getRawMessage("config",
ZOOKEEPER_CONN);
                                }

                                if (StringUtils.isEmpty(connectionString))             {
                                                logger.error("<Fatal> Error zookeeper.connection.string
property not defined in config properties file");
                                                alertLog.error("<Fatal> Error zookeeper.connection.string
property not defined in config properties file");
                                }

                                client = CuratorFrameworkFactory.newClient(connectionString,
15000, 15000, new RetryNTimes(5,200));
                                client.start();
                                client.getConnectionStateListenable().addListener(this);
                                startLeaderSelector();
                }

                /**
                * The method that Curator calls when this class is given leadership.  It will
maintain leadership until this method returns or until this server dies or there is some connection
problem.
                 */
                @Override
                public void takeLeadership(CuratorFramework arg0) throws Exception {
                                hasLeadership = true;
                                leadershipThread = Thread.currentThread();
                                ConsumerDAO dao = new ConsumerDAO();
                                while (connected && !this.isClosed) {
                                                try {
                                                                if (logger.isDebugEnabled())
 logger.debug("Checking the Consumer Queues.  The Interval: " + ((getInterval() / 1000) /
60) + ", Queue Limit: " + getLimit());
                                                                List<ConsumerDAO.QueueMonitorData>
queuesMonitorData = dao.getQueueMonitorData();

                                                                for (String alert : getSpectrumAlerts(queuesMonitorData))
{
                                                                                alertLog.error(alert);
                                                                                logger.error(alert);
                                                                }
                                                                leadershipThread.sleep(getInterval());
                                                } catch (InterruptedException ie) {
                                                                //the leadership thread was
interupted.. so let go of the loop
                                                }
                                }
                                hasLeadership = false;
                }

                private void startLeaderSelector() {
                                setConnected(true);
                                leaderSelector = new LeaderSelector(client, zkNodePath, this);
                                leaderSelector.autoRequeue();
                                leaderSelector.start();
                }

                /**
                * Called by the ConnectionListener to update the network connection state.
                 * This flag controls the leadership loop.  If the connection is lost, we
want to drop out of the loop.
                 */
                public void setConnected(boolean connected) {
                                this.connected = connected;
                }


                /**
                * Should be called when the server is being shut down or the J2EE applicaiton
is being removed.
                */
                public void shutdown() {
                                if (logger.isDebugEnabled())       logger.debug("Shutting
Down...");
                                close();
                                CloseableUtils.closeQuietly(client);
                                client = null;
                }

                /**
                * Should be called when the connection is lost or called by shutdown.
                 */
                public void close() {
                                //close the leaderSelector
                                CloseableUtils.closeQuietly(leaderSelector);
                                leaderSelector = null;

                                //let the leadership loop know that we are closed.
                                this.isClosed = true;

                                //interupt the leadership thread so it will wake up if it
is sleeping.
                                if (leadershipThread != null) {
                                                leadershipThread.interrupt();
                                }

                                //A little delay to let the leadership be reliqueished if
I have it. (eg. The takeLeadership method returns)
                                try {
                                                Thread.sleep(1000);
                                }
                                catch (Exception e) {
                                                //empty
                                }
                }


                @Override
                public void stateChanged(CuratorFramework client, ConnectionState newState)
{
                                if (newState == ConnectionState.CONNECTED) {
                                                setConnected(true);
                                }
                                else if (newState == ConnectionState.RECONNECTED) {
                                                //just in case this is a reconnect right after
losing the connection and the Connection.LOST is still in process
                                                while (shuttingDownZk) {
                                                                try {
                                                                                Thread.sleep(500);
                                                                }
                                                                catch (Exception e) {    
  //empty
                                                                }
                                                }
                                                if (leaderSelector != null) {
                                                                setConnected(true);
                                                } else {
                                                                //leaderSelector was destroyed
by the connection lost.
                                                                startLeaderSelector();
                                                }
                                }
                                else if (newState == ConnectionState.SUSPENDED) {
                                                setConnected(false);
                                }
                                else if (newState == ConnectionState.LOST) {
                                                shuttingDownZk = true;
                                                close();
                                                shuttingDownZk = false;
                                }
                }
}



The information contained in this message is proprietary and/or confidential. If you are not
the intended recipient, please: (i) delete the message and all copies; (ii) do not disclose,
distribute or use the message in any manner; and (iii) notify the sender immediately. In addition,
please be aware that any message addressed to our domain is subject to archiving and review
by persons other than the intended recipient. Thank you.

Mime
View raw message