zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an...@apache.org
Subject [08/36] zookeeper git commit: ZOOKEEPER-3032: MAVEN MIGRATION - branch-3.4 - zookeeper-server
Date Wed, 24 Oct 2018 09:32:24 GMT
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/7291e47c/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectableZooKeeper.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectableZooKeeper.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectableZooKeeper.java
new file mode 100644
index 0000000..619bdc6
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectableZooKeeper.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.IOException;
+
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+public class DisconnectableZooKeeper extends ZooKeeper {
+    public DisconnectableZooKeeper(String host, int sessionTimeout, Watcher watcher)
+        throws IOException
+    {
+        super(host, sessionTimeout, watcher);
+    }
+    
+    public DisconnectableZooKeeper(String host, int sessionTimeout, Watcher watcher,
+        long sessionId, byte[] sessionPasswd)
+        throws IOException
+    {
+        super(host, sessionTimeout, watcher, sessionId, sessionPasswd);
+    }
+
+    /** Testing only!!! Really!!!! This is only here to test when the client
+     * disconnects from the server w/o sending a session disconnect (ie
+     * ending the session cleanly). The server will eventually notice the
+     * client is no longer pinging and will timeout the session.
+     */
+    public void disconnect() throws IOException {
+        cnxn.disconnect();
+    }
+
+    /**
+     * Prevent the client from automatically reconnecting if the connection to the
+     * server is lost
+     */
+    public void dontReconnect() throws Exception {
+        java.lang.reflect.Field f = cnxn.getClass().getDeclaredField("closing");
+        f.setAccessible(true);
+        f.setBoolean(cnxn, true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/7291e47c/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectedWatcherTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectedWatcherTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectedWatcherTest.java
new file mode 100644
index 0000000..e81b7f4
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/DisconnectedWatcherTest.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DisconnectedWatcherTest extends ClientBase {
+    protected static final Logger LOG = LoggerFactory.getLogger(DisconnectedWatcherTest.class);
+    final int TIMEOUT = 5000;
+
+    private class MyWatcher extends CountdownWatcher {
+        LinkedBlockingQueue<WatchedEvent> events =
+            new LinkedBlockingQueue<WatchedEvent>();
+
+        public void process(WatchedEvent event) {
+            super.process(event);
+            if (event.getType() != Event.EventType.None) {
+                try {
+                    events.put(event);
+                } catch (InterruptedException e) {
+                    LOG.warn("ignoring interrupt during event.put");
+                }
+            }
+        }
+    }
+
+    // @see jira issue ZOOKEEPER-961
+    
+    @Test
+    public void testChildWatcherAutoResetWithChroot() throws Exception {
+        ZooKeeper zk1 = createClient();
+
+        zk1.create("/ch1", null, Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+
+        MyWatcher watcher = new MyWatcher();
+        ZooKeeper zk2 = createClient(watcher, hostPort + "/ch1");
+        zk2.getChildren("/", true );
+
+        // this call shouldn't trigger any error or watch
+        zk1.create("/youdontmatter1", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        // this should trigger the watch
+        zk1.create("/ch1/youshouldmatter1", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        WatchedEvent e = watcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+        Assert.assertNotNull(e);
+        Assert.assertEquals(EventType.NodeChildrenChanged, e.getType());
+        Assert.assertEquals("/", e.getPath());
+
+        MyWatcher childWatcher = new MyWatcher();
+        zk2.getChildren("/", childWatcher);
+        
+        stopServer();
+        watcher.waitForDisconnected(3000);
+        startServer();
+        watcher.waitForConnected(3000);
+
+        // this should trigger the watch
+        zk1.create("/ch1/youshouldmatter2", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+        Assert.assertNotNull(e);
+        Assert.assertEquals(EventType.NodeChildrenChanged, e.getType());
+        Assert.assertEquals("/", e.getPath());
+    }
+    
+    @Test
+    public void testDefaultWatcherAutoResetWithChroot() throws Exception {
+        ZooKeeper zk1 = createClient();
+
+        zk1.create("/ch1", null, Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+
+        MyWatcher watcher = new MyWatcher();
+        ZooKeeper zk2 = createClient(watcher, hostPort + "/ch1");
+        zk2.getChildren("/", true );
+
+        // this call shouldn't trigger any error or watch
+        zk1.create("/youdontmatter1", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        // this should trigger the watch
+        zk1.create("/ch1/youshouldmatter1", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        WatchedEvent e = watcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+        Assert.assertNotNull(e);
+        Assert.assertEquals(EventType.NodeChildrenChanged, e.getType());
+        Assert.assertEquals("/", e.getPath());
+
+        zk2.getChildren("/", true );
+
+        stopServer();
+        watcher.waitForDisconnected(3000);
+        startServer();
+        watcher.waitForConnected(3000);
+
+        // this should trigger the watch
+        zk1.create("/ch1/youshouldmatter2", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        e = watcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+        Assert.assertNotNull(e);
+        Assert.assertEquals(EventType.NodeChildrenChanged, e.getType());
+        Assert.assertEquals("/", e.getPath());
+    }
+    
+    @Test
+    public void testDeepChildWatcherAutoResetWithChroot() throws Exception {
+        ZooKeeper zk1 = createClient();
+
+        zk1.create("/ch1", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        zk1.create("/ch1/here", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        zk1.create("/ch1/here/we", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        zk1.create("/ch1/here/we/are", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        MyWatcher watcher = new MyWatcher();
+        ZooKeeper zk2 = createClient(watcher, hostPort + "/ch1/here/we");
+        zk2.getChildren("/are", true );
+
+        // this should trigger the watch
+        zk1.create("/ch1/here/we/are/now", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        WatchedEvent e = watcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+        Assert.assertNotNull(e);
+        Assert.assertEquals(EventType.NodeChildrenChanged, e.getType());
+        Assert.assertEquals("/are", e.getPath());
+
+        MyWatcher childWatcher = new MyWatcher();
+        zk2.getChildren("/are", childWatcher);
+        
+        stopServer();
+        watcher.waitForDisconnected(3000);
+        startServer();
+        watcher.waitForConnected(3000);
+
+        // this should trigger the watch
+        zk1.create("/ch1/here/we/are/again", null, Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+        Assert.assertNotNull(e);
+        Assert.assertEquals(EventType.NodeChildrenChanged, e.getType());
+        Assert.assertEquals("/are", e.getPath());
+    }
+
+    // @see jira issue ZOOKEEPER-706. Test auto reset of a large number of
+    // watches which require multiple SetWatches calls.
+    @Test(timeout = 840000)
+    public void testManyChildWatchersAutoReset() throws Exception {
+        ZooKeeper zk1 = createClient();
+
+        MyWatcher watcher = new MyWatcher();
+        ZooKeeper zk2 = createClient(watcher);
+
+        // 110 character base path
+        String pathBase = "/long-path-000000000-111111111-222222222-333333333-444444444-"
+                          + "555555555-666666666-777777777-888888888-999999999";
+
+        zk1.create(pathBase, null, Ids.OPEN_ACL_UNSAFE,
+                   CreateMode.PERSISTENT);
+
+        // Create 10,000 nodes. This should ensure the length of our
+        // watches set below exceeds 1MB.
+        List<String> paths = new ArrayList<String>();
+        for (int i = 0; i < 10000; i++) {
+            String path = zk1.create(pathBase + "/ch-", null, Ids.OPEN_ACL_UNSAFE,
+                                     CreateMode.PERSISTENT_SEQUENTIAL);
+            paths.add(path);
+        }
+        LOG.info("Created 10,000 nodes.");
+
+        MyWatcher childWatcher = new MyWatcher();
+
+        // Set a combination of child/exists/data watches
+        int i = 0;
+        for (String path : paths) {
+            if (i % 3 == 0) {
+                zk2.getChildren(path, childWatcher);
+            } else if (i % 3 == 1) {
+                zk2.exists(path + "/foo", childWatcher);
+            } else if (i % 3 == 2) {
+                zk2.getData(path, childWatcher, null);
+            }
+
+            i++;
+        }
+
+        stopServer();
+        watcher.waitForDisconnected(30000);
+        startServer();
+        watcher.waitForConnected(30000);
+
+        // Trigger the watches and ensure they properly propagate to the client
+        i = 0;
+        for (String path : paths) {
+            if (i % 3 == 0) {
+                zk1.create(path + "/ch", null, Ids.OPEN_ACL_UNSAFE,
+                           CreateMode.PERSISTENT);
+
+                WatchedEvent e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+                Assert.assertNotNull(e);
+                Assert.assertEquals(EventType.NodeChildrenChanged, e.getType());
+                Assert.assertEquals(path, e.getPath());
+            } else if (i % 3 == 1) {
+                zk1.create(path + "/foo", null, Ids.OPEN_ACL_UNSAFE,
+                           CreateMode.PERSISTENT);
+
+                WatchedEvent e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+                Assert.assertNotNull(e);
+                Assert.assertEquals(EventType.NodeCreated, e.getType());
+                Assert.assertEquals(path + "/foo", e.getPath());
+            } else if (i % 3 == 2) {
+                zk1.setData(path, new byte[]{1, 2, 3}, -1);
+
+                WatchedEvent e = childWatcher.events.poll(TIMEOUT, TimeUnit.MILLISECONDS);
+                Assert.assertNotNull(e);
+                Assert.assertEquals(EventType.NodeDataChanged, e.getType());
+                Assert.assertEquals(path, e.getPath());
+            }
+
+            i++;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/7291e47c/zookeeper-server/src/test/java/org/apache/zookeeper/test/EventTypeTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/EventTypeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/EventTypeTest.java
new file mode 100644
index 0000000..e6a9826
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/EventTypeTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.util.EnumSet;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class EventTypeTest extends ZKTestCase {
+    
+    @Test
+    public void testIntConversion() {
+        // Ensure that we can convert all valid integers to EventTypes
+        EnumSet<EventType> allTypes = EnumSet.allOf(EventType.class);
+
+        for(EventType et : allTypes) {
+            Assert.assertEquals(et, EventType.fromInt( et.getIntValue() ) );
+        }
+    }
+
+    @Test
+    public void testInvalidIntConversion() {
+        try {
+            EventType et = EventType.fromInt(324242);
+            Assert.fail("Was able to create an invalid EventType via an integer");
+        } catch(RuntimeException re) {
+            // we're good.
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/7291e47c/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java
new file mode 100644
index 0000000..1731e6e
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.Semaphore;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.Vote;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FLENewEpochTest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory.getLogger(FLENewEpochTest.class);
+
+    int count;
+    HashMap<Long,QuorumServer> peers;
+    ArrayList<LEThread> threads;
+    File tmpdir[];
+    int port[];
+    volatile int [] round;
+
+    Semaphore start0;
+    Semaphore finish3, finish0;
+
+    @Before
+    public void setUp() throws Exception {
+        count = 3;
+
+        peers = new HashMap<Long,QuorumServer>(count);
+        threads = new ArrayList<LEThread>(count);
+        tmpdir = new File[count];
+        port = new int[count];
+
+        round = new int[3];
+        round[0] = 0;
+        round[1] = 0;
+        round[2] = 0;
+
+        start0 = new Semaphore(0);
+        finish0 = new Semaphore(0);
+        finish3 = new Semaphore(0);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        for(int i = 0; i < threads.size(); i++) {
+            ((FastLeaderElection) threads.get(i).peer.getElectionAlg()).shutdown();
+        }
+    }
+
+
+    class LEThread extends Thread {
+        int i;
+        QuorumPeer peer;
+
+        LEThread(QuorumPeer peer, int i) {
+            this.i = i;
+            this.peer = peer;
+            LOG.info("Constructor: " + getName());
+
+        }
+
+        public void run(){
+            boolean flag = true;
+            try{
+                while(flag){
+                    Vote v = null;
+                    peer.setPeerState(ServerState.LOOKING);
+                    LOG.info("Going to call leader election again: " + i);
+                    v = peer.getElectionAlg().lookForLeader();
+
+                    if (v == null){
+                        Assert.fail("Thread " + i + " got a null vote");
+                    }
+
+                    /*
+                     * A real zookeeper would take care of setting the current vote. Here
+                     * we do it manually.
+                     */
+                    peer.setCurrentVote(v);
+
+                    LOG.info("Finished election: " + i + ", " + v.getId());
+                    //votes[i] = v;
+
+                    switch (i) {
+                    case 0:
+                        LOG.info("First peer, do nothing, just join");
+                        if(finish0.tryAcquire(1000, java.util.concurrent.TimeUnit.MILLISECONDS)){
+                        //if(threads.get(0).peer.getPeerState() == ServerState.LEADING ){
+                            LOG.info("Setting flag to false");
+                            flag = false;
+                        }
+                        break;
+                    case 1:
+                        LOG.info("Second entering case");
+                        if(round[1] != 0){
+                            finish0.release();
+                            flag = false;
+                        } else {
+                            finish3.acquire();
+                            start0.release();
+                        }
+                        LOG.info("Second is going to start second round");
+                        round[1]++;
+                        break;
+                    case 2:
+                        LOG.info("Third peer, shutting it down");
+                        QuorumBase.shutdown(peer);
+                        flag = false;
+                        round[2] = 1;
+                        finish3.release();
+                        LOG.info("Third leaving");
+                        break;
+                    }
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+
+      @Test
+      public void testLENewEpoch() throws Exception {
+
+          FastLeaderElection le[] = new FastLeaderElection[count];
+
+          LOG.info("TestLE: " + getTestName()+ ", " + count);
+          for(int i = 0; i < count; i++) {
+              peers.put(Long.valueOf(i),
+                        new QuorumServer(i, "0.0.0.0", PortAssignment.unique(),
+                                         PortAssignment.unique(), null));
+              tmpdir[i] = ClientBase.createTmpDir();
+              port[i] = PortAssignment.unique();
+          }
+
+          for(int i = 1; i < le.length; i++) {
+              QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2);
+              peer.startLeaderElection();
+              LEThread thread = new LEThread(peer, i);
+              thread.start();
+              threads.add(thread);
+          }
+          if(!start0.tryAcquire(4000, java.util.concurrent.TimeUnit.MILLISECONDS))
+              Assert.fail("First leader election failed");
+
+          QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
+          peer.startLeaderElection();
+          LEThread thread = new LEThread(peer, 0);
+          thread.start();
+          threads.add(thread);
+
+          LOG.info("Started threads " + getTestName());
+
+          for(int i = 0; i < threads.size(); i++) {
+              threads.get(i).join(10000);
+              if (threads.get(i).isAlive()) {
+                  Assert.fail("Threads didn't join");
+              }
+
+          }
+      }
+  }

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/7291e47c/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java
new file mode 100644
index 0000000..0ecac6e
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FLEPredicateTest extends ZKTestCase {
+    
+    protected static final Logger LOG = LoggerFactory.getLogger(FLEPredicateTest.class);
+    
+    class MockFLE extends FastLeaderElection {
+        MockFLE(QuorumPeer peer){
+            super(peer, peer.createCnxnManager());
+        }
+        
+        boolean predicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch){
+            return this.totalOrderPredicate(newId, newZxid, newEpoch, curId, curZxid, curEpoch);
+        }
+    }
+    
+    
+    HashMap<Long,QuorumServer> peers;
+    
+    @Test
+    public void testPredicate() throws IOException {
+        
+        peers = new HashMap<Long,QuorumServer>(3);
+        
+        /*
+         * Creates list of peers.
+         */
+        for(int i = 0; i < 3; i++) {
+            peers.put(Long.valueOf(i),
+                      new QuorumServer(i, "0.0.0.0", PortAssignment.unique(),
+                                       PortAssignment.unique(), null));
+        }
+
+        /*
+         * Creating peer.
+         */
+        try{
+            File tmpDir = ClientBase.createTmpDir();
+            QuorumPeer peer = new QuorumPeer(peers, tmpDir, tmpDir,
+                                        PortAssignment.unique(), 3, 0, 1000, 2, 2);
+        
+            MockFLE mock = new MockFLE(peer);
+            
+            /*
+             * Lower epoch must return false
+             */
+            
+            Assert.assertFalse (mock.predicate(4L, 0L, 0L, 3L, 0L, 2L));
+            
+            /*
+             * Later epoch
+             */
+            Assert.assertTrue (mock.predicate(0L, 0L, 1L, 1L, 0L, 0L));
+        
+            /*
+             * Higher zxid
+             */
+            Assert.assertTrue(mock.predicate(0L, 1L, 0L, 1L, 0L, 0L));
+        
+            /*
+             * Higher id
+             */
+            Assert.assertTrue(mock.predicate(1L, 1L, 0L, 0L, 1L, 0L));
+        } catch (IOException e) {
+            LOG.error("Exception while creating quorum peer", e);
+            Assert.fail("Exception while creating quorum peer");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/7291e47c/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLERestartTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLERestartTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLERestartTest.java
new file mode 100644
index 0000000..a7cecf6
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLERestartTest.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.concurrent.Semaphore;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.Vote;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FLERestartTest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory.getLogger(FLETest.class);
+
+    static class TestVote {
+        TestVote(int id, long leader) {
+            this.leader = leader;
+        }
+
+        long leader;
+    }
+
+    int countVotes(HashSet<TestVote> hs, long id) {
+        int counter = 0;
+        for(TestVote v : hs){
+            if(v.leader == id) counter++;
+        }
+
+        return counter;
+    }
+
+    int count;
+    //    int baseport;
+    //    int baseLEport;
+    HashMap<Long,QuorumServer> peers;
+    ArrayList<FLERestartThread> restartThreads;
+    HashMap<Integer, HashSet<TestVote> > voteMap;
+    File tmpdir[];
+    int port[];
+    int successCount;
+    Semaphore finish;
+
+    volatile Vote votes[];
+    volatile boolean leaderDies;
+    volatile long leader = -1;
+    //volatile int round = 1;
+    Random rand = new Random();
+
+    @Before
+    public void setUp() throws Exception {
+        count = 3;
+
+        peers = new HashMap<Long,QuorumServer>(count);
+        restartThreads = new ArrayList<FLERestartThread>(count);
+        voteMap = new HashMap<Integer, HashSet<TestVote> >();
+        votes = new Vote[count];
+        tmpdir = new File[count];
+        port = new int[count];
+        successCount = 0;
+        finish = new Semaphore(0);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        for(int i = 0; i < restartThreads.size(); i++) {
+            ((FastLeaderElection) restartThreads.get(i).peer.getElectionAlg()).shutdown();
+        }
+    }
+
+    class FLERestartThread extends Thread {
+        int i;
+        QuorumPeer peer;
+        int peerRound = 0;
+
+        FLERestartThread(QuorumPeer peer, int i) {
+            this.i = i;
+            this.peer = peer;
+            LOG.info("Constructor: " + getName());
+        }
+        public void run() {
+            try {
+                Vote v = null;
+                while(true) {
+                    peer.setPeerState(ServerState.LOOKING);
+                    LOG.info("Going to call leader election again.");
+                    v = peer.getElectionAlg().lookForLeader();
+                    if(v == null){
+                        LOG.info("Thread " + i + " got a null vote");
+                        break;
+                    }
+
+                    /*
+                     * A real zookeeper would take care of setting the current vote. Here
+                     * we do it manually.
+                     */
+                    peer.setCurrentVote(v);
+
+                    LOG.info("Finished election: " + i + ", " + v.getId());
+                    //votes[i] = v;
+
+                    switch(i){
+                    case 0:
+                        if(peerRound == 0){
+                            LOG.info("First peer, shutting it down");
+                            QuorumBase.shutdown(peer);
+                            ((FastLeaderElection) restartThreads.get(i).peer.getElectionAlg()).shutdown();
+
+                            peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2);
+                            peer.startLeaderElection();
+                            peerRound++;
+                        } else {
+                            finish.release(2);
+                            return;
+                        }
+
+                        break;
+                    case 1:
+                        LOG.info("Second entering case");
+                        finish.acquire();
+                        //if(threads.get(0).peer.getPeerState() == ServerState.LEADING ){
+                        LOG.info("Release");
+
+                        return;
+                    case 2:
+                        LOG.info("First peer, do nothing, just join");
+                        finish.acquire();
+                        //if(threads.get(0).peer.getPeerState() == ServerState.LEADING ){
+                        LOG.info("Release");
+
+                        return;
+                    }
+                }
+            } catch (Exception e){
+                e.printStackTrace();
+            }
+        }
+    }
+
+
+    @Test
+    public void testLERestart() throws Exception {
+
+        FastLeaderElection le[] = new FastLeaderElection[count];
+        leaderDies = true;
+        boolean allowOneBadLeader = leaderDies;
+
+        LOG.info("TestLE: " + getTestName()+ ", " + count);
+        for(int i = 0; i < count; i++) {
+            peers.put(Long.valueOf(i),
+                      new QuorumServer(i, "0.0.0.0", PortAssignment.unique(),
+                                       PortAssignment.unique(), null));
+            tmpdir[i] = ClientBase.createTmpDir();
+            port[i] = PortAssignment.unique();
+        }
+
+        for(int i = 0; i < count; i++) {
+            QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2);
+            peer.startLeaderElection();
+            FLERestartThread thread = new FLERestartThread(peer, i);
+            thread.start();
+            restartThreads.add(thread);
+        }
+        LOG.info("Started threads " + getTestName());
+        for(int i = 0; i < restartThreads.size(); i++) {
+            restartThreads.get(i).join(10000);
+            if (restartThreads.get(i).isAlive()) {
+                Assert.fail("Threads didn't join");
+            }
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/7291e47c/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLETest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLETest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLETest.java
new file mode 100644
index 0000000..6281a57
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLETest.java
@@ -0,0 +1,473 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.Vote;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FLETest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory.getLogger(FLETest.class);
+    private FLETest.LEThread leThread;
+
+    static class TestVote {
+        TestVote(int id, long leader) {
+            this.leader = leader;
+        }
+
+        long leader;
+    }
+
+    int countVotes(HashSet<TestVote> hs, long id) {
+        int counter = 0;
+        for(TestVote v : hs){
+            if(v.leader == id) counter++;
+        }
+
+        return counter;
+    }
+
+    int count;
+    HashMap<Long,QuorumServer> peers;
+    ArrayList<LEThread> threads;
+    HashMap<Integer, HashSet<TestVote> > voteMap;
+    File tmpdir[];
+    int port[];
+    int successCount;
+    Object finalObj;
+
+    volatile Vote votes[];
+    volatile boolean leaderDies;
+    volatile long leader = -1;
+    //volatile int round = 1;
+    Random rand = new Random();
+    Set<Long> joinedThreads;
+
+    @Before
+    public void setUp() throws Exception {
+        count = 7;
+
+        peers = new HashMap<Long,QuorumServer>(count);
+        threads = new ArrayList<LEThread>(count);
+        voteMap = new HashMap<Integer, HashSet<TestVote> >();
+        votes = new Vote[count];
+        tmpdir = new File[count];
+        port = new int[count];
+        successCount = 0;
+        finalObj = new Object();
+        joinedThreads = new HashSet<Long>();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        for (int i = 0; i < threads.size(); i++) {
+            leThread = threads.get(i);
+            QuorumBase.shutdown(leThread.peer);
+        }
+    }
+
+    class LEThread extends Thread {
+        int i;
+        QuorumPeer peer;
+        //int peerRound = 1;
+
+        LEThread(QuorumPeer peer, int i) {
+            this.i = i;
+            this.peer = peer;
+            LOG.info("Constructor: " + getName());
+        }
+        public void run() {
+            try {
+                Vote v = null;
+                while(true) {
+                    peer.setPeerState(ServerState.LOOKING);
+                    LOG.info("Going to call leader election again.");
+                    v = peer.getElectionAlg().lookForLeader();
+                    if(v == null){
+                        LOG.info("Thread " + i + " got a null vote");
+                        break;
+                    }
+
+                    /*
+                     * A real zookeeper would take care of setting the current vote. Here
+                     * we do it manually.
+                     */
+                    peer.setCurrentVote(v);
+
+                    LOG.info("Finished election: " + i + ", " + v.getId());
+                    votes[i] = v;
+
+                    /*
+                     * Get the current value of the logical clock for this peer.
+                     */
+                    int lc = (int) ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock();
+
+                    if (v.getId() == i) {
+                        /*
+                         * A leader executes this part of the code. If it is the first leader to be
+                         * elected, then it Assert.fails right after. Otherwise, it waits until it has enough
+                         * followers supporting it.
+                         */
+                        LOG.info("I'm the leader: " + i);
+                        synchronized(FLETest.this) {
+                            if (leaderDies) {
+                                LOG.info("Leader " + i + " dying");
+                                leaderDies = false;
+                                ((FastLeaderElection) peer.getElectionAlg()).shutdown();
+                                leader = -1;
+                                LOG.info("Leader " + i + " dead");
+
+                                //round++;
+                                FLETest.this.notifyAll();
+
+                                break;
+
+                            } else {
+                                synchronized(voteMap){
+                                    if(voteMap.get(lc) == null)
+                                        voteMap.put(lc, new HashSet<TestVote>());
+                                    HashSet<TestVote> hs = voteMap.get(lc);
+                                    hs.add(new TestVote(i, v.getId()));
+
+                                    if(countVotes(hs, v.getId()) > (count/2)){
+                                        leader = i;
+                                        LOG.info("Got majority: " + i);
+                                    } else {
+                                        voteMap.wait(3000);
+                                        LOG.info("Notified or expired: " + i);
+                                        hs = voteMap.get(lc);
+                                        if(countVotes(hs, v.getId()) > (count/2)){
+                                            leader = i;
+                                            LOG.info("Got majority: " + i);
+                                        } else {
+                                            //round++;
+                                        }
+                                    }
+                                }
+                                FLETest.this.notifyAll();
+
+                                if(leader == i){
+                                    synchronized(finalObj){
+                                        successCount++;
+                                        joinedThreads.add((long)i);
+                                        if(successCount > (count/2)) finalObj.notify();
+                                    }
+
+                                    break;
+                                }
+                            }
+                        }
+                    } else {
+                        /*
+                         * Followers execute this part. They first add their vote to voteMap, and then
+                         * they wait for bounded amount of time. A leader notifies followers through the
+                         * FLETest.this object.
+                         *
+                         * Note that I can get FLETest.this, and then voteMap before adding the vote of
+                         * a follower, otherwise a follower would be blocked out until the leader notifies
+                         * or leaves the synchronized block on FLEtest.this.
+                         */
+
+
+                        LOG.info("Logical clock " + ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock());
+                        synchronized(voteMap){
+                            LOG.info("Voting on " + votes[i].getId() + ", round " + ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock());
+                            if(voteMap.get(lc) == null)
+                                voteMap.put(lc, new HashSet<TestVote>());
+                            HashSet<TestVote> hs = voteMap.get(lc);
+                            hs.add(new TestVote(i, votes[i].getId()));
+                            if(countVotes(hs, votes[i].getId()) > (count/2)){
+                                LOG.info("Logical clock: " + lc + ", " + votes[i].getId());
+                                voteMap.notify();
+                            }
+                        }
+
+                        /*
+                         * In this part a follower waits until the leader notifies it, and remove its
+                         * vote if the leader takes too long to respond.
+                         */
+                        synchronized(FLETest.this){
+                            if (leader != votes[i].getId()) FLETest.this.wait(3000);
+
+                            LOG.info("The leader: " + leader + " and my vote " + votes[i].getId());
+                            synchronized(voteMap){
+                                if (leader == votes[i].getId()) {
+                                    synchronized(finalObj){
+                                        successCount++;
+                                        joinedThreads.add((long)i);
+                                        if(successCount > (count/2)) finalObj.notify();
+                                    }
+                                    break;
+                                } else {
+                                    HashSet<TestVote> hs = voteMap.get(lc);
+                                    TestVote toRemove = null;
+                                    for(TestVote tv : hs){
+                                        if(v.getId() == i){
+                                            toRemove = tv;
+                                            break;
+                                        }
+                                    }
+                                    hs.remove(toRemove);
+                                }
+                            }
+                        }
+                    }
+                    /*
+                     * Add some randomness to the execution.
+                     */
+                    Thread.sleep(rand.nextInt(500));
+                    peer.setCurrentVote(new Vote(peer.getId(), 0));
+                }
+                LOG.debug("Thread " + i + " votes " + v);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void testLE() throws Exception {
+
+        FastLeaderElection le[] = new FastLeaderElection[count];
+        leaderDies = true;
+        boolean allowOneBadLeader = leaderDies;
+
+        LOG.info("TestLE: " + getTestName()+ ", " + count);
+        for(int i = 0; i < count; i++) {
+            peers.put(Long.valueOf(i),
+                      new QuorumServer(i, "0.0.0.0", PortAssignment.unique(),
+                                       PortAssignment.unique(), null));
+            tmpdir[i] = ClientBase.createTmpDir();
+            port[i] = PortAssignment.unique();
+        }
+
+        for(int i = 0; i < le.length; i++) {
+            QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i],
+                    port[i], 3, i, 1000, 2, 2);
+            peer.startLeaderElection();
+            LEThread thread = new LEThread(peer, i);
+            thread.start();
+            threads.add(thread);
+        }
+        LOG.info("Started threads " + getTestName());
+
+
+        int waitCounter = 0;
+        synchronized(finalObj){
+            while((successCount <= count/2) && (waitCounter < 50)){
+                finalObj.wait(2000);
+                waitCounter++;
+            }
+        }
+
+        /*
+        * Lists what threads haven-t joined. A thread doesn't join if
+        * it hasn't decided upon a leader yet. It can happen that a
+        * peer is slow or disconnected, and it can take longer to
+        * nominate and connect to the current leader.
+        */
+       for (int i = 0; i < threads.size(); i++) {
+            if (threads.get(i).isAlive()) {
+                LOG.info("Threads didn't join: " + i);
+            }
+        }
+
+       /*
+        * If we have a majority, then we are good to go.
+        */
+       if(successCount <= count/2){
+           Assert.fail("Fewer than a a majority has joined");
+       }
+
+       synchronized(finalObj){
+           if(!joinedThreads.contains(leader)){
+               Assert.fail("Leader hasn't joined: " + leader);
+           }
+       }
+    }
+
+    /*
+     * Class to verify of the thread has become a follower
+     */
+    class VerifyState extends Thread {
+        volatile private boolean success = false;
+        QuorumPeer peer;
+        public VerifyState(QuorumPeer peer) {
+            this.peer = peer;
+        }
+        public void run() {
+            setName("VerifyState-" + peer.getId());
+            while (true) {
+                if(peer.getPeerState() == ServerState.FOLLOWING) {
+                    LOG.info("I am following");
+                    success = true;
+                    break;
+                } else if (peer.getPeerState() == ServerState.LEADING) {
+                    LOG.info("I am leading");
+                    success = false;
+                    break;
+                }
+                try {
+                    Thread.sleep(250);
+                } catch (Exception e) {
+                    LOG.warn("Sleep failed ", e);
+                }
+            }
+        }
+        public boolean isSuccess() {
+            return success;
+        }
+    }
+
+    /*
+     * For ZOOKEEPER-975 verify that a peer joining an established cluster
+     * does not go in LEADING state.
+     */
+    @Test
+    public void testJoin() throws Exception {
+        int sid;
+        QuorumPeer peer;
+        int waitTime = 10 * 1000;
+        ArrayList<QuorumPeer> peerList = new ArrayList<QuorumPeer>();
+        for(sid = 0; sid < 3; sid++) {
+            peers.put(Long.valueOf(sid),
+                      new QuorumServer(sid, "0.0.0.0", PortAssignment.unique(),
+                                       PortAssignment.unique(), null));
+            tmpdir[sid] = ClientBase.createTmpDir();
+            port[sid] = PortAssignment.unique();
+        }
+        // start 2 peers and verify if they form the cluster
+        for (sid = 0; sid < 2; sid++) {
+            peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid],
+                                             port[sid], 3, sid, 2000, 2, 2);
+            LOG.info("Starting peer " + peer.getId());
+            peer.start();
+            peerList.add(sid, peer);
+        }
+        peer = peerList.get(0);
+        VerifyState v1 = new VerifyState(peerList.get(0));
+        v1.start();
+        v1.join(waitTime);
+        Assert.assertFalse("Unable to form cluster in " +
+            waitTime + " ms",
+            !v1.isSuccess());
+        // Start 3rd peer and check if it goes in LEADING state
+        peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid],
+                 port[sid], 3, sid, 2000, 2, 2);
+        LOG.info("Starting peer " + peer.getId());
+        peer.start();
+        peerList.add(sid, peer);
+        v1 = new VerifyState(peer);
+        v1.start();
+        v1.join(waitTime);
+        if (v1.isAlive()) {
+               Assert.fail("Peer " + peer.getId() + " failed to join the cluster " +
+                "within " + waitTime + " ms");
+        } else if (!v1.isSuccess()) {
+               Assert.fail("Incorrect LEADING state for peer " + peer.getId());
+        }
+        // cleanup
+        for (int id = 0; id < 3; id++) {
+            peer = peerList.get(id);
+            if (peer != null) {
+                peer.shutdown();
+            }
+        }
+    }
+
+    /*
+     * For ZOOKEEPER-1732 verify that it is possible to join an ensemble with
+     * inconsistent election round information.
+     */
+    @Test
+    public void testJoinInconsistentEnsemble() throws Exception {
+        int sid;
+        QuorumPeer peer;
+        int waitTime = 10 * 1000;
+        ArrayList<QuorumPeer> peerList = new ArrayList<QuorumPeer>();
+        for(sid = 0; sid < 3; sid++) {
+            peers.put(Long.valueOf(sid),
+                      new QuorumServer(sid, "0.0.0.0", PortAssignment.unique(),
+                                       PortAssignment.unique(), null));
+            tmpdir[sid] = ClientBase.createTmpDir();
+            port[sid] = PortAssignment.unique();
+        }
+        // start 2 peers and verify if they form the cluster
+        for (sid = 0; sid < 2; sid++) {
+            peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid],
+                                             port[sid], 3, sid, 2000, 2, 2);
+            LOG.info("Starting peer " + peer.getId());
+            peer.start();
+            peerList.add(sid, peer);
+        }
+        peer = peerList.get(0);
+        VerifyState v1 = new VerifyState(peerList.get(0));
+        v1.start();
+        v1.join(waitTime);
+        Assert.assertFalse("Unable to form cluster in " +
+            waitTime + " ms",
+            !v1.isSuccess());
+        // Change the election round for one of the members of the ensemble
+        long leaderSid = peer.getCurrentVote().getId();
+        long zxid = peer.getCurrentVote().getZxid();
+        long electionEpoch = peer.getCurrentVote().getElectionEpoch();
+        ServerState state = peer.getCurrentVote().getState();
+        long peerEpoch = peer.getCurrentVote().getPeerEpoch();
+        Vote newVote = new Vote(leaderSid, zxid+100, electionEpoch+100, peerEpoch, state);
+        peer.setCurrentVote(newVote);
+        // Start 3rd peer and check if it joins the quorum
+        peer = new QuorumPeer(peers, tmpdir[2], tmpdir[2],
+                 port[2], 3, 2, 2000, 2, 2);
+        LOG.info("Starting peer " + peer.getId());
+        peer.start();
+        peerList.add(sid, peer);
+        v1 = new VerifyState(peer);
+        v1.start();
+        v1.join(waitTime);
+        if (v1.isAlive()) {
+               Assert.fail("Peer " + peer.getId() + " failed to join the cluster " +
+                "within " + waitTime + " ms");
+        }
+        // cleanup
+        for (int id = 0; id < 3; id++) {
+            peer = peerList.get(id);
+            if (peer != null) {
+                peer.shutdown();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/7291e47c/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java
new file mode 100644
index 0000000..e8a8cf7
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.Random;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.Vote;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FLEZeroWeightTest extends ZKTestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(HierarchicalQuorumTest.class);
+
+    Properties qp;
+
+    int count;
+    HashMap<Long,QuorumServer> peers;
+    ArrayList<LEThread> threads;
+    File tmpdir[];
+    int port[];
+    Object finalObj;
+
+    volatile Vote votes[];
+    volatile boolean leaderDies;
+    volatile long leader = -1;
+    Random rand = new Random();
+
+
+    @Before
+    public void setUp() throws Exception {
+        count = 9;
+
+        peers = new HashMap<Long,QuorumServer>(count);
+        threads = new ArrayList<LEThread>(count);
+        votes = new Vote[count];
+        tmpdir = new File[count];
+        port = new int[count];
+        finalObj = new Object();
+
+        String config = "group.1=0:1:2\n" +
+        "group.2=3:4:5\n" +
+        "group.3=6:7:8\n" +
+        "weight.0=1\n" +
+        "weight.1=1\n" +
+        "weight.2=1\n" +
+        "weight.3=0\n" +
+        "weight.4=0\n" +
+        "weight.5=0\n" +
+        "weight.6=0\n" +
+        "weight.7=0\n" +
+        "weight.8=0";
+
+        ByteArrayInputStream is = new ByteArrayInputStream(config.getBytes());
+        this.qp = new Properties();
+        qp.load(is);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        for(int i = 0; i < threads.size(); i++) {
+            LEThread leThread = threads.get(i);
+            // shutdown() has to be explicitly called for every thread to
+            // make sure that resources are freed properly and all fixed network ports
+            // are available for other test cases
+            QuorumBase.shutdown(leThread.peer);
+        }
+    }
+
+    class LEThread extends Thread {
+        int i;
+        QuorumPeer peer;
+        boolean fail;
+
+        LEThread(QuorumPeer peer, int i) {
+            this.i = i;
+            this.peer = peer;
+            LOG.info("Constructor: " + getName());
+        }
+
+        public void run() {
+            try {
+                Vote v = null;
+                fail = false;
+                while(true){
+
+                    //while(true) {
+                    peer.setPeerState(ServerState.LOOKING);
+                    LOG.info("Going to call leader election.");
+                    v = peer.getElectionAlg().lookForLeader();
+                    if(v == null){
+                        LOG.info("Thread " + i + " got a null vote");
+                        return;
+                    }
+
+                    /*
+                     * A real zookeeper would take care of setting the current vote. Here
+                     * we do it manually.
+                     */
+                    peer.setCurrentVote(v);
+
+                    LOG.info("Finished election: " + i + ", " + v.getId());
+                    votes[i] = v;
+
+                    if((peer.getPeerState() == ServerState.LEADING) &&
+                            (peer.getId() > 2)) fail = true;
+
+                    if((peer.getPeerState() == ServerState.FOLLOWING) ||
+                            (peer.getPeerState() == ServerState.LEADING)) break;
+                }
+                LOG.debug("Thread " + i + " votes " + v);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Test
+    public void testZeroWeightQuorum() throws Exception {
+        FastLeaderElection le[] = new FastLeaderElection[count];
+
+        LOG.info("TestZeroWeightQuorum: " + getTestName()+ ", " + count);
+        for(int i = 0; i < count; i++) {
+            peers.put(Long.valueOf(i),
+                      new QuorumServer(i, "0.0.0.0", PortAssignment.unique(), PortAssignment.unique(), null));
+            tmpdir[i] = ClientBase.createTmpDir();
+            port[i] = PortAssignment.unique();
+        }
+
+        for(int i = 0; i < le.length; i++) {
+            QuorumHierarchical hq = new QuorumHierarchical(qp);
+            QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2, hq);
+            peer.startLeaderElection();
+            LEThread thread = new LEThread(peer, i);
+            thread.start();
+            threads.add(thread);
+        }
+        LOG.info("Started threads " + getTestName());
+
+        for(int i = 0; i < threads.size(); i++) {
+            threads.get(i).join(15000);
+            if (threads.get(i).isAlive()) {
+                Assert.fail("Threads didn't join");
+            } else {
+                if(threads.get(i).fail)
+                    Assert.fail("Elected zero-weight server");
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/7291e47c/zookeeper-server/src/test/java/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
new file mode 100644
index 0000000..acd05d6
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
@@ -0,0 +1,607 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.TestableZooKeeper;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.quorum.Leader;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class FollowerResyncConcurrencyTest extends ZKTestCase {
+    private static final Logger LOG = Logger.getLogger(FollowerResyncConcurrencyTest.class);
+    public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
+
+    private volatile int counter = 0;
+    private volatile int errors = 0;
+
+    /**
+     * See ZOOKEEPER-1319 - verify that a lagging follwer resyncs correctly
+     * 
+     * 1) start with down quorum
+     * 2) start leader/follower1, add some data
+     * 3) restart leader/follower1
+     * 4) start follower2
+     * 5) verify data consistency across the ensemble
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testLaggingFollowerResyncsUnderNewEpoch() throws Exception {
+        CountdownWatcher watcher1 = new CountdownWatcher();
+        CountdownWatcher watcher2 = new CountdownWatcher();
+        CountdownWatcher watcher3 = new CountdownWatcher();
+
+        QuorumUtil qu = new QuorumUtil(1);
+        qu.shutdownAll();
+
+        qu.start(1);
+        qu.start(2);
+        Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:"
+                + qu.getPeer(1).clientPort, ClientBase.CONNECTION_TIMEOUT));
+        Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:"
+                + qu.getPeer(2).clientPort, ClientBase.CONNECTION_TIMEOUT));
+
+        ZooKeeper zk1 =
+                createClient(qu.getPeer(1).peer.getClientPort(), watcher1);
+        LOG.info("zk1 has session id 0x" + Long.toHexString(zk1.getSessionId()));
+
+        final String resyncPath = "/resyncundernewepoch";
+        zk1.create(resyncPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk1.close();
+
+        qu.shutdown(1);
+        qu.shutdown(2);
+        Assert.assertTrue("Waiting for server down", ClientBase.waitForServerDown("127.0.0.1:"
+                + qu.getPeer(1).clientPort, ClientBase.CONNECTION_TIMEOUT));
+        Assert.assertTrue("Waiting for server down", ClientBase.waitForServerDown("127.0.0.1:"
+                + qu.getPeer(2).clientPort, ClientBase.CONNECTION_TIMEOUT));
+        
+        qu.start(1);
+        qu.start(2);
+        Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:"
+                + qu.getPeer(1).clientPort, ClientBase.CONNECTION_TIMEOUT));
+        Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:"
+                + qu.getPeer(2).clientPort, ClientBase.CONNECTION_TIMEOUT));
+
+        qu.start(3);
+        Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:"
+                + qu.getPeer(3).clientPort, ClientBase.CONNECTION_TIMEOUT));
+
+        zk1 = createClient(qu.getPeer(1).peer.getClientPort(), watcher1);
+        LOG.info("zk1 has session id 0x" + Long.toHexString(zk1.getSessionId()));
+        
+        assertNotNull("zk1 has data", zk1.exists(resyncPath, false));
+
+        final ZooKeeper zk2 =
+                createClient(qu.getPeer(2).peer.getClientPort(), watcher2);
+        LOG.info("zk2 has session id 0x" + Long.toHexString(zk2.getSessionId()));
+
+        assertNotNull("zk2 has data", zk2.exists(resyncPath, false));
+
+        final ZooKeeper zk3 =
+            createClient(qu.getPeer(3).peer.getClientPort(), watcher3);
+        LOG.info("zk3 has session id 0x" + Long.toHexString(zk3.getSessionId()));
+
+        assertNotNull("zk3 has data", zk3.exists(resyncPath, false));
+
+        zk1.close();
+        zk2.close();
+        zk3.close();
+        
+        qu.shutdownAll();
+    }      
+
+    /**
+     * See ZOOKEEPER-962. This tests for one of the bugs hit while fixing this,
+     * setting the ZXID of the SNAP packet
+     * Starts up 3 ZKs. Shut down F1, write a node, restart the one that was shut down
+     * The non-leader ZKs are writing to cluster
+     * Shut down F1 again
+     * Restart after sessions are expired, expect to get a snap file
+     * Shut down, run some transactions through.
+     * Restart to a diff while transactions are running in leader
+     * @throws IOException
+     * @throws InterruptedException
+     * @throws KeeperException
+     */
+    @Test
+    public void testResyncBySnapThenDiffAfterFollowerCrashes() 
+        throws IOException, InterruptedException, KeeperException,  Throwable
+    {
+        final Semaphore sem = new Semaphore(0);
+
+        QuorumUtil qu = new QuorumUtil(1);
+        qu.startAll();
+        CountdownWatcher watcher1 = new CountdownWatcher();
+        CountdownWatcher watcher2 = new CountdownWatcher();
+        CountdownWatcher watcher3 = new CountdownWatcher();
+
+        int index = 1;
+        while(qu.getPeer(index).peer.leader == null) {
+            index++;
+        }
+
+        Leader leader = qu.getPeer(index).peer.leader;
+        assertNotNull(leader);    
+
+        /* Reusing the index variable to select a follower to connect to */
+        index = (index == 1) ? 2 : 1;
+        LOG.info("Connecting to follower:" + index);
+
+        qu.shutdown(index);
+
+        final ZooKeeper zk3 =
+            createClient(qu.getPeer(3).peer.getClientPort(), watcher3);
+        LOG.info("zk3 has session id 0x" + Long.toHexString(zk3.getSessionId()));
+
+        zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+
+        qu.restart(index);
+        final ZooKeeper zk1 =
+            createClient(qu.getPeer(index).peer.getClientPort(), watcher1);
+        LOG.info("zk1 has session id 0x" + Long.toHexString(zk1.getSessionId()));
+
+        final ZooKeeper zk2 =
+            createClient(qu.getPeer(index).peer.getClientPort(), watcher2);
+        LOG.info("zk2 has session id 0x" + Long.toHexString(zk2.getSessionId()));
+        
+        zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        Thread mytestfooThread = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                for(int i = 0; i < 3000; i++) {
+                    zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
+
+                        @Override
+                        public void processResult(int rc, String path, Object ctx, String name) {
+                            counter++;
+                            if (rc != 0) {
+                                errors++;
+                            }
+                            if(counter == 16200){
+                                sem.release();
+                            }
+                        }
+                    }, null);
+                    if(i%10==0){
+                        try {
+                            Thread.sleep(100);
+                        } catch (Exception e) {
+
+                        }
+                    }
+                }
+
+            }
+        });
+
+        for(int i = 0; i < 13000; i++) {
+            zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, String name) {
+                    counter++;
+                    if (rc != 0) {
+                        errors++;
+                    }
+                    if(counter == 16200){
+                        sem.release();
+                    }
+                }
+            }, null);            
+
+            if(i == 5000){
+                qu.shutdown(index);               
+                LOG.info("Shutting down s1");
+            }
+            if(i == 12000){
+                //Restart off of snap, then get some txns for a log, then shut down
+                mytestfooThread.start();
+                qu.restart(index);       
+                Thread.sleep(300);
+                qu.shutdown(index);               
+                Thread.sleep(300);                
+                qu.restart(index);
+                LOG.info("Setting up server: " + index);
+            }
+            if((i % 1000) == 0){
+                Thread.sleep(1000);
+            }
+
+            if(i%50 == 0) {
+                zk2.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
+                    @Override
+                    public void processResult(int rc, String path, Object ctx, String name) {
+                        counter++;
+                        if (rc != 0) {
+                            errors++;
+                        }
+                        if(counter == 16200){
+                            sem.release();
+                        }
+                    }
+                }, null);
+            }
+        }
+
+        // Wait until all updates return
+        if(!sem.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
+            LOG.warn("Did not aquire semaphore fast enough");
+        }
+        mytestfooThread.join(ClientBase.CONNECTION_TIMEOUT);
+        if (mytestfooThread.isAlive()) {
+            LOG.error("mytestfooThread is still alive");
+        }
+        Thread.sleep(1000);
+        
+        verifyState(qu, index, leader);
+        
+        zk1.close();
+        zk2.close();
+        zk3.close();
+        
+        qu.shutdownAll();
+    }      
+    
+    /**
+     * This test:
+     * Starts up 3 ZKs. The non-leader ZKs are writing to cluster
+     * Shut down one of the non-leader ZKs. 
+     * Restart after sessions have expired but <500 txns have taken place (get a diff)
+     * Shut down immediately after restarting, start running separate thread with other transactions
+     * Restart to a diff while transactions are running in leader
+     * 
+     * 
+     * Before fixes for ZOOKEEPER-962, restarting off of diff could get an inconsistent view of data missing transactions that
+     * completed during diff syncing. Follower would also be considered "restarted" before all forwarded transactions
+     * were completely processed, so restarting would cause a snap file with a too-high zxid to be written, and transactions
+     * would be missed
+     * 
+     * This test should pretty reliably catch the failure of restarting the server before all diff messages have been processed,
+     * however, due to the transient nature of the system it may not catch failures due to concurrent processing of transactions
+     * during the leader's diff forwarding.
+     * 
+     * @throws IOException
+     * @throws InterruptedException
+     * @throws KeeperException
+     * @throws Throwable
+     */
+
+    @Test
+    public void testResyncByDiffAfterFollowerCrashes() 
+        throws IOException, InterruptedException, KeeperException, Throwable
+    {
+        final Semaphore sem = new Semaphore(0);
+
+        QuorumUtil qu = new QuorumUtil(1);
+        qu.startAll();
+        CountdownWatcher watcher1 = new CountdownWatcher();
+        CountdownWatcher watcher2 = new CountdownWatcher();
+        CountdownWatcher watcher3 = new CountdownWatcher();
+
+        int index = 1;
+        while(qu.getPeer(index).peer.leader == null) {
+            index++;
+        }
+
+        Leader leader = qu.getPeer(index).peer.leader;
+        assertNotNull(leader);
+
+        /* Reusing the index variable to select a follower to connect to */
+        index = (index == 1) ? 2 : 1;
+        LOG.info("Connecting to follower:" + index);
+
+        final ZooKeeper zk1 =
+            createClient(qu.getPeer(index).peer.getClientPort(), watcher1);
+        LOG.info("zk1 has session id 0x" + Long.toHexString(zk1.getSessionId()));
+
+        final ZooKeeper zk2 =
+            createClient(qu.getPeer(index).peer.getClientPort(), watcher2);
+        LOG.info("zk2 has session id 0x" + Long.toHexString(zk2.getSessionId()));
+
+        final ZooKeeper zk3 =
+            createClient(qu.getPeer(3).peer.getClientPort(), watcher3);
+        LOG.info("zk3 has session id 0x" + Long.toHexString(zk3.getSessionId()));
+
+        zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+
+        final AtomicBoolean runNow = new AtomicBoolean(false);
+        Thread mytestfooThread = new Thread(new Runnable() {
+            @Override
+            public void run() {                                
+                int inSyncCounter = 0;
+                while(inSyncCounter < 400) {    
+                    if(runNow.get()) {
+                        zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
+
+                            @Override
+                            public void processResult(int rc, String path, Object ctx, String name) {
+                                counter++;
+                                if (rc != 0) {
+                                    errors++;
+                                }
+                                if(counter > 7300){
+                                    sem.release();
+                                }
+                            }
+                        }, null);
+                        
+                        try {
+                            Thread.sleep(10);
+                        } catch (Exception e) {
+                        }
+                        inSyncCounter++;
+                    } else {
+                        Thread.yield();
+                    }
+                }
+
+            }
+        });
+
+        mytestfooThread.start();
+        for(int i = 0; i < 5000; i++) {
+            zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
+
+                @Override
+                public void processResult(int rc, String path, Object ctx, String name) {
+                    counter++;
+                    if (rc != 0) {
+                        errors++;
+                    }
+                    if(counter > 7300){
+                        sem.release();
+                    }
+                }
+            }, null);            
+
+            if(i == 1000){
+                qu.shutdown(index);      
+                Thread.sleep(1100);
+                LOG.info("Shutting down s1");
+            }
+            if(i == 1100 || i == 1150 || i == 1200) {
+                Thread.sleep(1000);
+            }
+            
+            if(i == 1200){
+                qu.startThenShutdown(index);                                
+                runNow.set(true);
+                qu.restart(index);
+                LOG.info("Setting up server: " + index);
+            }
+        
+            if(i>=1000 &&  i%2== 0) {
+                zk3.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
+
+                    @Override
+                    public void processResult(int rc, String path, Object ctx, String name) {
+                        counter++;
+                        if (rc != 0) {
+                            errors++;
+                        }
+                        if(counter > 7300){
+                            sem.release();
+                        }
+                    }
+                }, null);
+            }
+            if(i == 1050 || i == 1100 || i == 1150) {
+                Thread.sleep(1000);
+            }
+        }
+
+        // Wait until all updates return
+        if(!sem.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
+            LOG.warn("Did not aquire semaphore fast enough");
+        }
+        mytestfooThread.join(ClientBase.CONNECTION_TIMEOUT);
+        if (mytestfooThread.isAlive()) {
+            LOG.error("mytestfooThread is still alive");
+        }
+
+        Thread.sleep(1000);
+        // Verify that server is following and has the same epoch as the leader
+        
+        verifyState(qu, index, leader);
+        
+        zk1.close();
+        zk2.close();
+        zk3.close();
+        
+        qu.shutdownAll();
+    }
+
+    private static DisconnectableZooKeeper createClient(int port,
+            CountdownWatcher watcher)
+        throws IOException, TimeoutException, InterruptedException
+    {
+        DisconnectableZooKeeper zk = new DisconnectableZooKeeper(
+                "127.0.0.1:" + port, ClientBase.CONNECTION_TIMEOUT, watcher);
+
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+        return zk;
+    }
+
+    private static TestableZooKeeper createTestableClient(String hp)
+        throws IOException, TimeoutException, InterruptedException
+    {
+        CountdownWatcher watcher = new CountdownWatcher();
+        return createTestableClient(watcher, hp);
+    }
+
+    private static TestableZooKeeper createTestableClient(
+            CountdownWatcher watcher, String hp)
+            throws IOException, TimeoutException, InterruptedException
+        {
+            TestableZooKeeper zk = new TestableZooKeeper(
+                    hp, ClientBase.CONNECTION_TIMEOUT, watcher);
+
+            watcher.waitForConnected(CONNECTION_TIMEOUT);
+            return zk;
+        }
+
+    private void verifyState(QuorumUtil qu, int index, Leader leader) {
+        assertTrue("Not following", qu.getPeer(index).peer.follower != null);
+        long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L);
+        long epochL = (leader.getEpoch() >> 32L);
+        assertTrue("Zxid: " + qu.getPeer(index).peer.getActiveServer().getZKDatabase().getDataTreeLastProcessedZxid() + 
+                "Current epoch: " + epochF, epochF == epochL);
+        int leaderIndex = (index == 1) ? 2 : 1;    
+        Collection<Long> sessionsRestarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase().getSessions();
+        Collection<Long> sessionsNotRestarted = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase().getSessions();
+        
+        for(Long l : sessionsRestarted) {
+            assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessionsNotRestarted.contains(l));        
+        }      
+        assertEquals("Should have same number of sessions", sessionsNotRestarted.size(), sessionsRestarted.size());
+        ZKDatabase restarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase();
+        ZKDatabase clean =  qu.getPeer(3).peer.getActiveServer().getZKDatabase();
+        ZKDatabase lead = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase();
+        for(Long l : sessionsRestarted) {
+            assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessionsNotRestarted.contains(l));
+            HashSet ephemerals = restarted.getEphemerals(l);
+            HashSet cleanEphemerals = clean.getEphemerals(l);
+            for(Object o : cleanEphemerals) {
+                if(!ephemerals.contains(o)) {
+                    LOG.info("Restarted follower doesn't contain ephemeral " + o);
+                }
+            }
+            HashSet leadEphemerals = lead.getEphemerals(l);
+            for(Object o : leadEphemerals) {
+                if(!cleanEphemerals.contains(o)) {
+                    LOG.info("Follower doesn't contain ephemeral from leader " + o);
+                }
+            }
+            assertEquals("Should have same number of ephemerals in both followers", ephemerals.size(), cleanEphemerals.size());            
+            assertEquals("Leader should equal follower", lead.getEphemerals(l).size(), cleanEphemerals.size());
+        }
+    }      
+
+    /**
+     * Verify that the server is sending the proper zxid. See ZOOKEEPER-1412.
+     */
+    @Test
+    public void testFollowerSendsLastZxid() throws Exception {
+        QuorumUtil qu = new QuorumUtil(1);
+        qu.startAll();
+
+        int index = 1;
+        while(qu.getPeer(index).peer.follower == null) {
+            index++;
+        }
+        LOG.info("Connecting to follower:" + index);
+
+        TestableZooKeeper zk =
+                createTestableClient("localhost:" + qu.getPeer(index).peer.getClientPort());
+
+        assertEquals(0L, zk.testableLastZxid());
+        zk.exists("/", false);
+        long lzxid = zk.testableLastZxid();
+        assertTrue("lzxid:" + lzxid + " > 0", lzxid > 0);
+        zk.close();
+    }
+
+    private class MyWatcher extends CountdownWatcher {
+        LinkedBlockingQueue<WatchedEvent> events =
+            new LinkedBlockingQueue<WatchedEvent>();
+
+        public void process(WatchedEvent event) {
+            super.process(event);
+            if (event.getType() != Event.EventType.None) {
+                try {
+                    events.put(event);
+                } catch (InterruptedException e) {
+                    LOG.warn("ignoring interrupt during event.put");
+                }
+            }
+        }
+    }
+
+    /**
+     * Verify that the server is sending the proper zxid, and as a result
+     * the watch doesn't fire. See ZOOKEEPER-1412.
+     */
+    @Test
+    public void testFollowerWatcherResync() throws Exception {
+        QuorumUtil qu = new QuorumUtil(1);
+        qu.startAll();
+
+        int index = 1;
+        while(qu.getPeer(index).peer.follower == null) {
+            index++;
+        }
+        LOG.info("Connecting to follower:" + index);
+
+        TestableZooKeeper zk1 = createTestableClient(
+                "localhost:" + qu.getPeer(index).peer.getClientPort());
+        zk1.create("/foo", "foo".getBytes(), Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+
+        MyWatcher watcher = new MyWatcher();
+        TestableZooKeeper zk2 = createTestableClient(watcher,
+                "localhost:" + qu.getPeer(index).peer.getClientPort());
+
+        zk2.exists("/foo", true);
+
+        watcher.reset();
+        zk2.testableConnloss();
+        if (!watcher.clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS))
+        {
+            fail("Unable to connect to server");
+        }
+        assertArrayEquals("foo".getBytes(), zk2.getData("/foo", false, null));
+
+        assertNull(watcher.events.poll(5, TimeUnit.SECONDS));
+
+        zk1.close();
+        zk2.close();
+    }
+
+}


Mime
View raw message