From commits-return-7262-archive-asf-public=cust-asf.ponee.io@zookeeper.apache.org Wed Oct 24 11:31:03 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 8DACA1807A1 for ; Wed, 24 Oct 2018 11:30:58 +0200 (CEST) Received: (qmail 69200 invoked by uid 500); 24 Oct 2018 09:30:57 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zookeeper.apache.org Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 68829 invoked by uid 99); 24 Oct 2018 09:30:57 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Oct 2018 09:30:57 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 485D7E1188; Wed, 24 Oct 2018 09:30:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: andor@apache.org To: commits@zookeeper.apache.org Date: Wed, 24 Oct 2018 09:31:02 -0000 Message-Id: <83b2f93e4d724762a802b1b2cb983134@git.apache.org> In-Reply-To: <25d338a3641d4672b9e83437148d061f@git.apache.org> References: <25d338a3641d4672b9e83437148d061f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/51] [partial] zookeeper git commit: ZOOKEEPER-3032: MAVEN MIGRATION - branch-3.5 - zookeeper-server http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/EmptiedSnapshotRecoveryTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/EmptiedSnapshotRecoveryTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/EmptiedSnapshotRecoveryTest.java new file mode 100644 index 0000000..5b2f8a4 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/EmptiedSnapshotRecoveryTest.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.io.IOException; +import java.io.File; +import java.io.PrintWriter; +import java.util.List; +import java.util.LinkedList; + +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.server.quorum.Leader.Proposal; +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.SyncRequestProcessor; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.junit.Assert; +import org.junit.Test; + +/** If snapshots are corrupted to the empty file or deleted, Zookeeper should + * not proceed to read its transactiong log files + * Test that zxid == -1 in the presence of emptied/deleted snapshots + */ +public class EmptiedSnapshotRecoveryTest extends ZKTestCase implements Watcher { + private static final Logger LOG = Logger.getLogger(RestoreCommittedLogTest.class); + private static String HOSTPORT = "127.0.0.1:" + PortAssignment.unique(); + private static final int CONNECTION_TIMEOUT = 3000; + private static final int N_TRANSACTIONS = 150; + private static final int SNAP_COUNT = 100; + + public void runTest(boolean leaveEmptyFile) throws Exception { + File tmpSnapDir = ClientBase.createTmpDir(); + File tmpLogDir = ClientBase.createTmpDir(); + ClientBase.setupTestEnv(); + ZooKeeperServer zks = new ZooKeeperServer(tmpSnapDir, tmpLogDir, 3000); + SyncRequestProcessor.setSnapCount(SNAP_COUNT); + final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]); + ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1); + f.startup(zks); + Assert.assertTrue("waiting for server being up ", + ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT)); + ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this); + try { + for (int i = 0; i< N_TRANSACTIONS; i++) { + zk.create("/node-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } + } finally { + zk.close(); + } + f.shutdown(); + zks.shutdown(); + Assert.assertTrue("waiting for server to shutdown", + ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT)); + + // start server again with intact database + zks = new ZooKeeperServer(tmpSnapDir, tmpLogDir, 3000); + zks.startdata(); + long zxid = zks.getZKDatabase().getDataTreeLastProcessedZxid(); + LOG.info("After clean restart, zxid = " + zxid); + Assert.assertTrue("zxid > 0", zxid > 0); + zks.shutdown(); + + // Make all snapshots empty + FileTxnSnapLog txnLogFactory = zks.getTxnLogFactory(); + List snapshots = txnLogFactory.findNRecentSnapshots(10); + Assert.assertTrue("We have a snapshot to corrupt", snapshots.size() > 0); + for (File file: snapshots) { + if (leaveEmptyFile) { + new PrintWriter(file).close (); + } else { + file.delete(); + } + } + + // start server again with corrupted database + zks = new ZooKeeperServer(tmpSnapDir, tmpLogDir, 3000); + try { + zks.startdata(); + zxid = zks.getZKDatabase().loadDataBase(); + Assert.fail("Should have gotten exception for corrupted database"); + } catch (IOException e) { + // expected behavior + } + zks.shutdown(); + } + + /** + * Test resilience to empty Snapshots + * @throws Exception an exception might be thrown here + */ + @Test + public void testRestoreWithEmptySnapFiles() throws Exception { + runTest(true); + } + + /** + * Test resilience to deletion of Snapshots + * @throws Exception an exception might be thrown here + */ + @Test + public void testRestoreWithNoSnapFiles() throws Exception { + runTest(false); + } + + public void process(WatchedEvent event) { + // do nothing + } + +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/EventTypeTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/EventTypeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/EventTypeTest.java new file mode 100644 index 0000000..0c96c83 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/EventTypeTest.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.util.EnumSet; + +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.junit.Assert; +import org.junit.Test; + +public class EventTypeTest extends ZKTestCase { + + @Test + public void testIntConversion() { + // Ensure that we can convert all valid integers to EventTypes + EnumSet allTypes = EnumSet.allOf(EventType.class); + + for(EventType et : allTypes) { + Assert.assertEquals(et, EventType.fromInt( et.getIntValue() ) ); + } + } + + @Test + public void testInvalidIntConversion() { + try { + EventType.fromInt(324242); + Assert.fail("Was able to create an invalid EventType via an integer"); + } catch(RuntimeException re) { + // we're good. + } + + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java new file mode 100644 index 0000000..8bf365f --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.io.File; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.concurrent.Semaphore; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.server.quorum.FastLeaderElection; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.Vote; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class FLENewEpochTest extends ZKTestCase { + protected static final Logger LOG = LoggerFactory.getLogger(FLENewEpochTest.class); + + int count; + HashMap peers; + ArrayList threads; + File tmpdir[]; + int port[]; + volatile int [] round; + + Semaphore start0; + Semaphore finish3, finish0; + + @Before + public void setUp() throws Exception { + count = 3; + + peers = new HashMap(count); + threads = new ArrayList(count); + tmpdir = new File[count]; + port = new int[count]; + + round = new int[3]; + round[0] = 0; + round[1] = 0; + round[2] = 0; + + start0 = new Semaphore(0); + finish0 = new Semaphore(0); + finish3 = new Semaphore(0); + } + + @After + public void tearDown() throws Exception { + for(int i = 0; i < threads.size(); i++) { + ((FastLeaderElection) threads.get(i).peer.getElectionAlg()).shutdown(); + } + } + + + class LEThread extends Thread { + int i; + QuorumPeer peer; + + LEThread(QuorumPeer peer, int i) { + this.i = i; + this.peer = peer; + LOG.info("Constructor: " + getName()); + + } + + public void run(){ + boolean flag = true; + try{ + while(flag){ + Vote v = null; + peer.setPeerState(ServerState.LOOKING); + LOG.info("Going to call leader election again: " + i); + v = peer.getElectionAlg().lookForLeader(); + + if (v == null){ + Assert.fail("Thread " + i + " got a null vote"); + } + + /* + * A real zookeeper would take care of setting the current vote. Here + * we do it manually. + */ + peer.setCurrentVote(v); + + LOG.info("Finished election: " + i + ", " + v.getId()); + //votes[i] = v; + + switch (i) { + case 0: + LOG.info("First peer, do nothing, just join"); + if(finish0.tryAcquire(1000, java.util.concurrent.TimeUnit.MILLISECONDS)){ + //if(threads.get(0).peer.getPeerState() == ServerState.LEADING ){ + LOG.info("Setting flag to false"); + flag = false; + } + break; + case 1: + LOG.info("Second entering case"); + if(round[1] != 0){ + finish0.release(); + flag = false; + } else { + finish3.acquire(); + start0.release(); + } + LOG.info("Second is going to start second round"); + round[1]++; + break; + case 2: + LOG.info("Third peer, shutting it down"); + QuorumBase.shutdown(peer); + flag = false; + round[2] = 1; + finish3.release(); + LOG.info("Third leaving"); + break; + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + + @Test + public void testLENewEpoch() throws Exception { + + LOG.info("TestLE: " + getTestName()+ ", " + count); + for(int i = 0; i < count; i++) { + peers.put(Long.valueOf(i), + new QuorumServer(i, + new InetSocketAddress( + "127.0.0.1", PortAssignment.unique()), + new InetSocketAddress( + "127.0.0.1", PortAssignment.unique()))); + tmpdir[i] = ClientBase.createTmpDir(); + port[i] = PortAssignment.unique(); + } + + for(int i = 1; i < count; i++) { + QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2); + peer.startLeaderElection(); + LEThread thread = new LEThread(peer, i); + thread.start(); + threads.add(thread); + } + if(!start0.tryAcquire(4000, java.util.concurrent.TimeUnit.MILLISECONDS)) + Assert.fail("First leader election failed"); + + QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2); + peer.startLeaderElection(); + LEThread thread = new LEThread(peer, 0); + thread.start(); + threads.add(thread); + + LOG.info("Started threads " + getTestName()); + + for(int i = 0; i < threads.size(); i++) { + threads.get(i).join(10000); + if (threads.get(i).isAlive()) { + Assert.fail("Threads didn't join"); + } + + } + } + } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java new file mode 100644 index 0000000..bc43775 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zookeeper.test; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; + +import org.apache.zookeeper.server.quorum.FastLeaderElection; +import org.apache.zookeeper.server.quorum.QuorumCnxManager; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZKTestCase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.junit.Assert; +import org.junit.Test; + +public class FLEPredicateTest extends ZKTestCase { + + protected static final Logger LOG = LoggerFactory.getLogger(FLEPredicateTest.class); + + class MockFLE extends FastLeaderElection { + MockFLE(QuorumPeer peer){ + super(peer, peer.createCnxnManager()); + } + + boolean predicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch){ + return this.totalOrderPredicate(newId, newZxid, newEpoch, curId, curZxid, curEpoch); + } + } + + + HashMap peers; + + @Test + public void testPredicate() throws IOException { + + peers = new HashMap(3); + + /* + * Creates list of peers. + */ + for(int i = 0; i < 3; i++) { + peers.put(Long.valueOf(i), + new QuorumServer(i, + new InetSocketAddress( + "127.0.0.1", PortAssignment.unique()), + new InetSocketAddress( + "127.0.0.1", PortAssignment.unique()))); + } + + /* + * Creating peer. + */ + try{ + File tmpDir = ClientBase.createTmpDir(); + QuorumPeer peer = new QuorumPeer(peers, tmpDir, tmpDir, + PortAssignment.unique(), 3, 0, 1000, 2, 2); + + MockFLE mock = new MockFLE(peer); + mock.start(); + + /* + * Lower epoch must return false + */ + + Assert.assertFalse (mock.predicate(4L, 0L, 0L, 3L, 0L, 2L)); + + /* + * Later epoch + */ + Assert.assertTrue (mock.predicate(0L, 0L, 1L, 1L, 0L, 0L)); + + /* + * Higher zxid + */ + Assert.assertTrue(mock.predicate(0L, 1L, 0L, 1L, 0L, 0L)); + + /* + * Higher id + */ + Assert.assertTrue(mock.predicate(1L, 1L, 0L, 0L, 1L, 0L)); + } catch (IOException e) { + LOG.error("Exception while creating quorum peer", e); + Assert.fail("Exception while creating quorum peer"); + } + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLERestartTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLERestartTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLERestartTest.java new file mode 100644 index 0000000..21e562b --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLERestartTest.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.io.File; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.concurrent.Semaphore; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.server.quorum.FastLeaderElection; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.Vote; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class FLERestartTest extends ZKTestCase { + protected static final Logger LOG = LoggerFactory.getLogger(FLETest.class); + + private int count; + private HashMap peers; + private ArrayList restartThreads; + private File tmpdir[]; + private int port[]; + private Semaphore finish; + + static class TestVote { + long leader; + + TestVote(int id, long leader) { + this.leader = leader; + } + } + + int countVotes(HashSet hs, long id) { + int counter = 0; + for(TestVote v : hs){ + if(v.leader == id) counter++; + } + + return counter; + } + + @Before + public void setUp() throws Exception { + count = 3; + peers = new HashMap(count); + restartThreads = new ArrayList(count); + tmpdir = new File[count]; + port = new int[count]; + finish = new Semaphore(0); + } + + @After + public void tearDown() throws Exception { + for(int i = 0; i < restartThreads.size(); i++) { + ((FastLeaderElection) restartThreads.get(i).peer.getElectionAlg()).shutdown(); + } + } + + class FLERestartThread extends Thread { + int i; + QuorumPeer peer; + int peerRound = 0; + + FLERestartThread(QuorumPeer peer, int i) { + this.i = i; + this.peer = peer; + LOG.info("Constructor: " + getName()); + } + public void run() { + try { + Vote v = null; + while(true) { + peer.setPeerState(ServerState.LOOKING); + LOG.info("Going to call leader election again."); + v = peer.getElectionAlg().lookForLeader(); + if(v == null){ + LOG.info("Thread " + i + " got a null vote"); + break; + } + + /* + * A real zookeeper would take care of setting the current vote. Here + * we do it manually. + */ + peer.setCurrentVote(v); + + LOG.info("Finished election: " + i + ", " + v.getId()); + //votes[i] = v; + + switch(i){ + case 0: + if(peerRound == 0){ + LOG.info("First peer, shutting it down"); + QuorumBase.shutdown(peer); + ((FastLeaderElection) restartThreads.get(i).peer.getElectionAlg()).shutdown(); + + peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2); + peer.startLeaderElection(); + peerRound++; + } else { + finish.release(2); + return; + } + + break; + case 1: + LOG.info("Second entering case"); + finish.acquire(); + //if(threads.get(0).peer.getPeerState() == ServerState.LEADING ){ + LOG.info("Release"); + + return; + case 2: + LOG.info("First peer, do nothing, just join"); + finish.acquire(); + //if(threads.get(0).peer.getPeerState() == ServerState.LEADING ){ + LOG.info("Release"); + + return; + } + } + } catch (Exception e){ + e.printStackTrace(); + } + } + } + + + @Test + public void testLERestart() throws Exception { + + LOG.info("TestLE: " + getTestName()+ ", " + count); + for(int i = 0; i < count; i++) { + peers.put(Long.valueOf(i), + new QuorumServer(i, + new InetSocketAddress( + "127.0.0.1", PortAssignment.unique()), + new InetSocketAddress( + "127.0.0.1", PortAssignment.unique()))); + tmpdir[i] = ClientBase.createTmpDir(); + port[i] = PortAssignment.unique(); + } + + for(int i = 0; i < count; i++) { + QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2); + peer.startLeaderElection(); + FLERestartThread thread = new FLERestartThread(peer, i); + thread.start(); + restartThreads.add(thread); + } + LOG.info("Started threads " + getTestName()); + for(int i = 0; i < restartThreads.size(); i++) { + restartThreads.get(i).join(10000); + if (restartThreads.get(i).isAlive()) { + Assert.fail("Threads didn't join"); + } + + } + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLETest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLETest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLETest.java new file mode 100644 index 0000000..51bcecb --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLETest.java @@ -0,0 +1,536 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import java.io.File; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.server.quorum.FastLeaderElection; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.Vote; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class FLETest extends ZKTestCase { + protected static final Logger LOG = LoggerFactory.getLogger(FLETest.class); + private final int MAX_LOOP_COUNTER = 300; + private FLETest.LEThread leThread; + + static class TestVote { + TestVote(int id, long leader) { + this.leader = leader; + } + + long leader; + } + + int countVotes(HashSet hs, long id) { + int counter = 0; + for(TestVote v : hs){ + if(v.leader == id) counter++; + } + + return counter; + } + + int count; + HashMap peers; + ArrayList threads; + HashMap > voteMap; + HashMap quora; + File tmpdir[]; + int port[]; + int successCount; + + volatile Vote votes[]; + volatile long leader = -1; + //volatile int round = 1; + Random rand = new Random(); + Set joinedThreads; + + @Before + public void setUp() throws Exception { + count = 7; + + peers = new HashMap(count); + threads = new ArrayList(count); + voteMap = new HashMap >(); + votes = new Vote[count]; + tmpdir = new File[count]; + port = new int[count]; + successCount = 0; + joinedThreads = new HashSet(); + } + + @After + public void tearDown() throws Exception { + for (int i = 0; i < threads.size(); i++) { + leThread = threads.get(i); + QuorumBase.shutdown(leThread.peer); + } + } + + + /** + * Implements the behavior of a peer during the leader election rounds + * of tests. + */ + class LEThread extends Thread { + FLETest self; + int i; + QuorumPeer peer; + int totalRounds; + ConcurrentHashMap > quora; + + LEThread(FLETest self, QuorumPeer peer, int i, int rounds, ConcurrentHashMap > quora) { + this.self = self; + this.i = i; + this.peer = peer; + this.totalRounds = rounds; + this.quora = quora; + + LOG.info("Constructor: " + getName()); + } + + public void run() { + try { + Vote v = null; + while(true) { + + /* + * Set the state of the peer to LOOKING and look for leader + */ + peer.setPeerState(ServerState.LOOKING); + LOG.info("Going to call leader election again."); + v = peer.getElectionAlg().lookForLeader(); + if(v == null){ + LOG.info("Thread " + i + " got a null vote"); + break; + } + + /* + * Done with the election round, so now we set the vote in + * the peer. A real zookeeper would take care of setting the + * current vote. Here we do it manually. + */ + peer.setCurrentVote(v); + + LOG.info("Finished election: " + i + ", " + v.getId()); + votes[i] = v; + + /* + * Get the current value of the logical clock for this peer + * so that we know in which round this peer has executed. + */ + int lc = (int) ((FastLeaderElection) peer.getElectionAlg()).getLogicalClock(); + + /* + * The leader executes the following block, which essentially shuts down + * the peer if it is not the last round. + */ + if (v.getId() == i) { + LOG.info("I'm the leader: " + i); + if (lc < this.totalRounds) { + LOG.info("Leader " + i + " dying"); + FastLeaderElection election = + (FastLeaderElection) peer.getElectionAlg(); + election.shutdown(); + // Make sure the vote is reset to -1 after shutdown. + Assert.assertEquals(-1, election.getVote().getId()); + LOG.info("Leader " + i + " dead"); + + break; + } + } + + /* + * If the peer has done enough rounds, then consider joining. The thread + * will only join if it is part of a quorum supporting the current + * leader. Otherwise it will try again. + */ + if (lc >= this.totalRounds) { + /* + * quora keeps the supporters of a given leader, so + * we first update it with the vote of this peer. + */ + if(quora.get(v.getId()) == null) quora.put(v.getId(), new HashSet()); + quora.get(v.getId()).add(i); + + /* + * we now wait until a quorum supports the same leader. + */ + if(waitForQuorum(v.getId())){ + synchronized(self){ + + /* + * Assert that the state of the thread is the one expected. + */ + if(v.getId() == i){ + Assert.assertTrue("Wrong state" + peer.getPeerState(), + peer.getPeerState() == ServerState.LEADING); + leader = i; + } else { + Assert.assertTrue("Wrong state" + peer.getPeerState(), + peer.getPeerState() == ServerState.FOLLOWING); + } + + /* + * Global variable keeping track of + * how many peers have successfully + * joined. + */ + successCount++; + joinedThreads.add((long)i); + self.notify(); + } + + /* + * I'm done so joining. + */ + break; + } else { + quora.get(v.getId()).remove(i); + } + } + + /* + * This sleep time represents the time a follower + * would take to declare the leader dead and start + * a new leader election. + */ + Thread.sleep(100); + + } + LOG.debug("Thread " + i + " votes " + v); + } catch (InterruptedException e) { + Assert.fail(e.toString()); + } + } + + /** + * Auxiliary method to make sure that enough followers terminated. + * + * @return boolean followers successfully joined. + */ + boolean waitForQuorum(long id) + throws InterruptedException { + int loopCounter = 0; + while((quora.get(id).size() <= count/2) && (loopCounter < MAX_LOOP_COUNTER)){ + Thread.sleep(100); + loopCounter++; + } + + if((loopCounter >= MAX_LOOP_COUNTER) && (quora.get(id).size() <= count/2)){ + return false; + } else { + return true; + } + } + + } + + + + @Test + public void testSingleElection() throws Exception { + try{ + runElection(1); + } catch (Exception e) { + Assert.fail(e.toString()); + } + } + + + @Test + public void testDoubleElection() throws Exception { + try{ + runElection(2); + } catch (Exception e) { + Assert.fail(e.toString()); + } + } + + @Test + public void testTripleElection() throws Exception { + try{ + runElection(3); + } catch (Exception e) { + Assert.fail(e.toString()); + } + } + + /** + * Test leader election for a number of rounds. In all rounds but the last one + * we kill the leader. + * + * @param rounds + * @throws Exception + */ + private void runElection(int rounds) throws Exception { + ConcurrentHashMap > quora = + new ConcurrentHashMap >(); + + LOG.info("TestLE: " + getTestName()+ ", " + count); + + /* + * Creates list of peers. + */ + for(int i = 0; i < count; i++) { + port[i] = PortAssignment.unique(); + peers.put(Long.valueOf(i), + new QuorumServer(i, + new InetSocketAddress( + "127.0.0.1", PortAssignment.unique()), + new InetSocketAddress( + "127.0.0.1", PortAssignment.unique()), + new InetSocketAddress( + "127.0.0.1", port[i]))); + tmpdir[i] = ClientBase.createTmpDir(); + } + + /* + * Start one LEThread for each peer we want to run. + */ + for(int i = 0; i < count; i++) { + QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], + port[i], 3, i, 1000, 2, 2); + peer.startLeaderElection(); + LEThread thread = new LEThread(this, peer, i, rounds, quora); + thread.start(); + threads.add(thread); + } + LOG.info("Started threads " + getTestName()); + + int waitCounter = 0; + synchronized(this){ + while(((successCount <= count/2) || (leader == -1)) + && (waitCounter < MAX_LOOP_COUNTER)) + { + this.wait(200); + waitCounter++; + } + } + LOG.info("Success count: " + successCount); + + /* + * Lists what threads haven't joined. A thread doesn't join if + * it hasn't decided upon a leader yet. It can happen that a + * peer is slow or disconnected, and it can take longer to + * nominate and connect to the current leader. + */ + for (int i = 0; i < threads.size(); i++) { + if (threads.get(i).isAlive()) { + LOG.info("Threads didn't join: " + i); + } + } + + /* + * If we have a majority, then we are good to go. + */ + if(successCount <= count/2){ + Assert.fail("Fewer than a a majority has joined"); + } + + /* + * I'm done so joining. + */ + if(!joinedThreads.contains(leader)){ + Assert.fail("Leader hasn't joined: " + leader); + } + } + + + /* + * Class to verify of the thread has become a follower + */ + static class VerifyState extends Thread { + volatile private boolean success = false; + private QuorumPeer peer; + public VerifyState(QuorumPeer peer) { + this.peer = peer; + } + public void run() { + setName("VerifyState-" + peer.getId()); + while (true) { + if(peer.getPeerState() == ServerState.FOLLOWING) { + LOG.info("I am following"); + success = true; + break; + } else if (peer.getPeerState() == ServerState.LEADING) { + LOG.info("I am leading"); + success = false; + break; + } + try { + Thread.sleep(250); + } catch (Exception e) { + LOG.warn("Sleep failed ", e); + } + } + } + public boolean isSuccess() { + return success; + } + } + + /* + * For ZOOKEEPER-975 verify that a peer joining an established cluster + * does not go in LEADING state. + */ + @Test + public void testJoin() throws Exception { + int sid; + QuorumPeer peer; + int waitTime = 10 * 1000; + ArrayList peerList = new ArrayList(); + for(sid = 0; sid < 3; sid++) { + port[sid] = PortAssignment.unique(); + peers.put(Long.valueOf(sid), + new QuorumServer(sid, + new InetSocketAddress( + "127.0.0.1", PortAssignment.unique()), + new InetSocketAddress( + "127.0.0.1", PortAssignment.unique()), + new InetSocketAddress( + "127.0.0.1", port[sid]))); + tmpdir[sid] = ClientBase.createTmpDir(); + } + // start 2 peers and verify if they form the cluster + for (sid = 0; sid < 2; sid++) { + peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid], + port[sid], 3, sid, 2000, 2, 2); + LOG.info("Starting peer " + peer.getId()); + peer.start(); + peerList.add(sid, peer); + } + peer = peerList.get(0); + VerifyState v1 = new VerifyState(peerList.get(0)); + v1.start(); + v1.join(waitTime); + Assert.assertFalse("Unable to form cluster in " + + waitTime + " ms", + !v1.isSuccess()); + // Start 3rd peer and check if it goes in LEADING state + peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid], + port[sid], 3, sid, 2000, 2, 2); + LOG.info("Starting peer " + peer.getId()); + peer.start(); + peerList.add(sid, peer); + v1 = new VerifyState(peer); + v1.start(); + v1.join(waitTime); + if (v1.isAlive()) { + Assert.fail("Peer " + peer.getId() + " failed to join the cluster " + + "within " + waitTime + " ms"); + } else if (!v1.isSuccess()) { + Assert.fail("Incorrect LEADING state for peer " + peer.getId()); + } + // cleanup + for (int id = 0; id < 3; id++) { + peer = peerList.get(id); + if (peer != null) { + peer.shutdown(); + } + } + } + + /* + * For ZOOKEEPER-1732 verify that it is possible to join an ensemble with + * inconsistent election round information. + */ + @Test + public void testJoinInconsistentEnsemble() throws Exception { + int sid; + QuorumPeer peer; + int waitTime = 10 * 1000; + ArrayList peerList = new ArrayList(); + for(sid = 0; sid < 3; sid++) { + peers.put(Long.valueOf(sid), + new QuorumServer(sid, + new InetSocketAddress( + "127.0.0.1", PortAssignment.unique()), + new InetSocketAddress( + "127.0.0.1", PortAssignment.unique()))); + tmpdir[sid] = ClientBase.createTmpDir(); + port[sid] = PortAssignment.unique(); + } + // start 2 peers and verify if they form the cluster + for (sid = 0; sid < 2; sid++) { + peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid], + port[sid], 3, sid, 2000, 2, 2); + LOG.info("Starting peer " + peer.getId()); + peer.start(); + peerList.add(sid, peer); + } + peer = peerList.get(0); + VerifyState v1 = new VerifyState(peerList.get(0)); + v1.start(); + v1.join(waitTime); + Assert.assertFalse("Unable to form cluster in " + + waitTime + " ms", + !v1.isSuccess()); + // Change the election round for one of the members of the ensemble + long leaderSid = peer.getCurrentVote().getId(); + long zxid = peer.getCurrentVote().getZxid(); + long electionEpoch = peer.getCurrentVote().getElectionEpoch(); + ServerState state = peer.getCurrentVote().getState(); + long peerEpoch = peer.getCurrentVote().getPeerEpoch(); + Vote newVote = new Vote(leaderSid, zxid+100, electionEpoch+100, peerEpoch, state); + peer.setCurrentVote(newVote); + // Start 3rd peer and check if it joins the quorum + peer = new QuorumPeer(peers, tmpdir[2], tmpdir[2], + port[2], 3, 2, 2000, 2, 2); + LOG.info("Starting peer " + peer.getId()); + peer.start(); + peerList.add(sid, peer); + v1 = new VerifyState(peer); + v1.start(); + v1.join(waitTime); + if (v1.isAlive()) { + Assert.fail("Peer " + peer.getId() + " failed to join the cluster " + + "within " + waitTime + " ms"); + } + // cleanup + for (int id = 0; id < 3; id++) { + peer = peerList.get(id); + if (peer != null) { + peer.shutdown(); + } + } + } + + @Test + public void testElectionTimeUnit() throws Exception { + Assert.assertEquals("MS", QuorumPeer.FLE_TIME_UNIT); + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java new file mode 100644 index 0000000..1ff089c --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Properties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.zookeeper.PortAssignment; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.server.quorum.FastLeaderElection; +import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.Vote; +import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; +import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class FLEZeroWeightTest extends ZKTestCase { + private static final Logger LOG = LoggerFactory.getLogger(HierarchicalQuorumTest.class); + + Properties qp; + + int count; + HashMap peers; + ArrayList threads; + File tmpdir[]; + int port[]; + + volatile Vote votes[]; + + @Before + public void setUp() throws Exception { + count = 9; + + peers = new HashMap(count); + threads = new ArrayList(count); + votes = new Vote[count]; + tmpdir = new File[count]; + port = new int[count]; + + String config = "group.1=0:1:2\n" + + "group.2=3:4:5\n" + + "group.3=6:7:8\n" + + "weight.0=1\n" + + "weight.1=1\n" + + "weight.2=1\n" + + "weight.3=0\n" + + "weight.4=0\n" + + "weight.5=0\n" + + "weight.6=0\n" + + "weight.7=0\n" + + "weight.8=0"; + + ByteArrayInputStream is = new ByteArrayInputStream(config.getBytes()); + this.qp = new Properties(); + qp.load(is); + } + + @After + public void tearDown() throws Exception { + for(int i = 0; i < threads.size(); i++) { + LEThread leThread = threads.get(i); + // shutdown() has to be explicitly called for every thread to + // make sure that resources are freed properly and all fixed network ports + // are available for other test cases + QuorumBase.shutdown(leThread.peer); + } + } + + class LEThread extends Thread { + int i; + QuorumPeer peer; + boolean fail; + + LEThread(QuorumPeer peer, int i) { + this.i = i; + this.peer = peer; + LOG.info("Constructor: " + getName()); + } + + public void run() { + try { + Vote v = null; + fail = false; + while(true){ + + //while(true) { + peer.setPeerState(ServerState.LOOKING); + LOG.info("Going to call leader election."); + v = peer.getElectionAlg().lookForLeader(); + if(v == null){ + LOG.info("Thread " + i + " got a null vote"); + return; + } + + /* + * A real zookeeper would take care of setting the current vote. Here + * we do it manually. + */ + peer.setCurrentVote(v); + + LOG.info("Finished election: " + i + ", " + v.getId()); + votes[i] = v; + + if((peer.getPeerState() == ServerState.LEADING) && + (peer.getId() > 2)) fail = true; + + if((peer.getPeerState() == ServerState.FOLLOWING) || + (peer.getPeerState() == ServerState.LEADING)) break; + } + LOG.debug("Thread " + i + " votes " + v); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + @Test + public void testZeroWeightQuorum() throws Exception { + LOG.info("TestZeroWeightQuorum: " + getTestName()+ ", " + count); + for(int i = 0; i < count; i++) { + InetSocketAddress addr1 = new InetSocketAddress("127.0.0.1",PortAssignment.unique()); + InetSocketAddress addr2 = new InetSocketAddress("127.0.0.1",PortAssignment.unique()); + InetSocketAddress addr3 = new InetSocketAddress("127.0.0.1",PortAssignment.unique()); + port[i] = addr3.getPort(); + qp.setProperty("server."+i, "127.0.0.1:"+addr1.getPort()+":"+addr2.getPort()+";"+port[i]); + peers.put(Long.valueOf(i), new QuorumServer(i, addr1, addr2, addr3)); + tmpdir[i] = ClientBase.createTmpDir(); + } + + for(int i = 0; i < count; i++) { + QuorumHierarchical hq = new QuorumHierarchical(qp); + QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2, hq); + peer.startLeaderElection(); + LEThread thread = new LEThread(peer, i); + thread.start(); + threads.add(thread); + } + LOG.info("Started threads " + getTestName()); + + for(int i = 0; i < threads.size(); i++) { + threads.get(i).join(15000); + if (threads.get(i).isAlive()) { + Assert.fail("Threads didn't join"); + } else { + if(threads.get(i).fail) + Assert.fail("Elected zero-weight server"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/43d71c2e/zookeeper-server/src/test/java/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java new file mode 100644 index 0000000..5086711 --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java @@ -0,0 +1,748 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.TestableZooKeeper; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.server.quorum.Leader; +import org.apache.zookeeper.test.ClientBase.CountdownWatcher; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class FollowerResyncConcurrencyTest extends ZKTestCase { + private static final Logger LOG = LoggerFactory.getLogger(FollowerResyncConcurrencyTest.class); + public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT; + + private AtomicInteger counter = new AtomicInteger(0); + private AtomicInteger errors = new AtomicInteger(0); + /** + * Keep track of pending async operations, we shouldn't start verifying + * the state until pending operation is 0 + */ + private AtomicInteger pending = new AtomicInteger(0); + + @Before + public void setUp() throws Exception { + pending.set(0); + errors.set(0); + counter.set(0); + } + + @After + public void tearDown() throws Exception { + LOG.info("Error count {}" , errors.get()); + } + + /** + * See ZOOKEEPER-1319 - verify that a lagging follwer resyncs correctly + * + * 1) start with down quorum + * 2) start leader/follower1, add some data + * 3) restart leader/follower1 + * 4) start follower2 + * 5) verify data consistency across the ensemble + * + * @throws Exception + */ + @Test + public void testLaggingFollowerResyncsUnderNewEpoch() throws Exception { + CountdownWatcher watcher1 = new CountdownWatcher(); + CountdownWatcher watcher2 = new CountdownWatcher(); + CountdownWatcher watcher3 = new CountdownWatcher(); + + QuorumUtil qu = new QuorumUtil(1); + qu.shutdownAll(); + + qu.start(1); + qu.start(2); + Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:" + + qu.getPeer(1).clientPort, ClientBase.CONNECTION_TIMEOUT)); + Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:" + + qu.getPeer(2).clientPort, ClientBase.CONNECTION_TIMEOUT)); + + ZooKeeper zk1 = + createClient(qu.getPeer(1).peer.getClientPort(), watcher1); + LOG.info("zk1 has session id 0x{}", Long.toHexString(zk1.getSessionId())); + + final String resyncPath = "/resyncundernewepoch"; + zk1.create(resyncPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk1.close(); + + qu.shutdown(1); + qu.shutdown(2); + Assert.assertTrue("Waiting for server down", ClientBase.waitForServerDown("127.0.0.1:" + + qu.getPeer(1).clientPort, ClientBase.CONNECTION_TIMEOUT)); + Assert.assertTrue("Waiting for server down", ClientBase.waitForServerDown("127.0.0.1:" + + qu.getPeer(2).clientPort, ClientBase.CONNECTION_TIMEOUT)); + + qu.start(1); + qu.start(2); + Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:" + + qu.getPeer(1).clientPort, ClientBase.CONNECTION_TIMEOUT)); + Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:" + + qu.getPeer(2).clientPort, ClientBase.CONNECTION_TIMEOUT)); + + qu.start(3); + Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:" + + qu.getPeer(3).clientPort, ClientBase.CONNECTION_TIMEOUT)); + + zk1 = createClient(qu.getPeer(1).peer.getClientPort(), watcher1); + LOG.info("zk1 has session id 0x{}", Long.toHexString(zk1.getSessionId())); + + assertNotNull("zk1 has data", zk1.exists(resyncPath, false)); + + final ZooKeeper zk2 = + createClient(qu.getPeer(2).peer.getClientPort(), watcher2); + LOG.info("zk2 has session id 0x{}", Long.toHexString(zk2.getSessionId())); + + assertNotNull("zk2 has data", zk2.exists(resyncPath, false)); + + final ZooKeeper zk3 = + createClient(qu.getPeer(3).peer.getClientPort(), watcher3); + LOG.info("zk3 has session id 0x{}", Long.toHexString(zk3.getSessionId())); + + assertNotNull("zk3 has data", zk3.exists(resyncPath, false)); + + zk1.close(); + zk2.close(); + zk3.close(); + + qu.shutdownAll(); + } + + /** + * See ZOOKEEPER-962. This tests for one of the bugs hit while fixing this, + * setting the ZXID of the SNAP packet + * Starts up 3 ZKs. Shut down F1, write a node, restart the one that was shut down + * The non-leader ZKs are writing to cluster + * Shut down F1 again + * Restart after sessions are expired, expect to get a snap file + * Shut down, run some transactions through. + * Restart to a diff while transactions are running in leader + * @throws IOException + * @throws InterruptedException + * @throws KeeperException + */ + @Test + public void testResyncBySnapThenDiffAfterFollowerCrashes() + throws IOException, InterruptedException, KeeperException, Throwable + { + followerResyncCrashTest(false); + } + + /** + * Same as testResyncBySnapThenDiffAfterFollowerCrashes() but we resync + * follower using txnlog + * + * @throws IOException + * @throws InterruptedException + * @throws KeeperException + */ + @Test + public void testResyncByTxnlogThenDiffAfterFollowerCrashes() + throws IOException, InterruptedException, KeeperException, Throwable + { + followerResyncCrashTest(true); + } + + public void followerResyncCrashTest(boolean useTxnLogResync) + throws IOException, InterruptedException, KeeperException, Throwable + { + final Semaphore sem = new Semaphore(0); + + QuorumUtil qu = new QuorumUtil(1); + qu.startAll(); + CountdownWatcher watcher1 = new CountdownWatcher(); + CountdownWatcher watcher2 = new CountdownWatcher(); + CountdownWatcher watcher3 = new CountdownWatcher(); + + int index = 1; + while(qu.getPeer(index).peer.leader == null) { + index++; + } + + Leader leader = qu.getPeer(index).peer.leader; + assertNotNull(leader); + + if (useTxnLogResync) { + // Set the factor to high value so that this test case always + // resync using txnlog + qu.getPeer(index).peer.getActiveServer().getZKDatabase() + .setSnapshotSizeFactor(1000); + } else { + // Disable sending DIFF using txnlog, so that this test still + // testing the ZOOKEEPER-962 bug + qu.getPeer(index).peer.getActiveServer().getZKDatabase() + .setSnapshotSizeFactor(-1); + } + + /* Reusing the index variable to select a follower to connect to */ + index = (index == 1) ? 2 : 1; + LOG.info("Connecting to follower: {}", index); + + qu.shutdown(index); + + final ZooKeeper zk3 = + createClient(qu.getPeer(3).peer.getClientPort(), watcher3); + LOG.info("zk3 has session id 0x{}", Long.toHexString(zk3.getSessionId())); + + zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + + qu.restart(index); + + final ZooKeeper zk1 = + createClient(qu.getPeer(index).peer.getClientPort(), watcher1); + LOG.info("zk1 has session id 0x{}", Long.toHexString(zk1.getSessionId())); + + final ZooKeeper zk2 = + createClient(qu.getPeer(index).peer.getClientPort(), watcher2); + LOG.info("zk2 has session id 0x{}", Long.toHexString(zk2.getSessionId())); + + zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + // Prepare a thread that will create znodes. + Thread mytestfooThread = new Thread(new Runnable() { + @Override + public void run() { + for(int i = 0; i < 3000; i++) { + // Here we create 3000 znodes + zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, String name) { + pending.decrementAndGet(); + counter.incrementAndGet(); + if (rc != 0) { + errors.incrementAndGet(); + } + if(counter.get() == 16200){ + sem.release(); + } + } + }, null); + pending.incrementAndGet(); + if(i%10==0){ + try { + Thread.sleep(100); + } catch (Exception e) { + + } + } + } + + } + }); + + // Here we start populating the server and shutdown the follower after + // initial data is written. + for(int i = 0; i < 13000; i++) { + // Here we create 13000 znodes + zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, String name) { + pending.decrementAndGet(); + counter.incrementAndGet(); + if (rc != 0) { + errors.incrementAndGet(); + } + if(counter.get() == 16200){ + sem.release(); + } + } + }, null); + pending.incrementAndGet(); + + if(i == 5000){ + qu.shutdown(index); + LOG.info("Shutting down s1"); + } + if(i == 12000){ + // Start the prepared thread so that it is writing znodes while + // the follower is restarting. On the first restart, the follow + // should use txnlog to catchup. For subsequent restart, the + // follower should use a diff to catchup. + mytestfooThread.start(); + LOG.info("Restarting follower: {}", index); + qu.restart(index); + Thread.sleep(300); + LOG.info("Shutdown follower: {}", index); + qu.shutdown(index); + Thread.sleep(300); + LOG.info("Restarting follower: {}", index); + qu.restart(index); + LOG.info("Setting up server: {}", index); + } + if((i % 1000) == 0){ + Thread.sleep(1000); + } + + if(i%50 == 0) { + zk2.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { + @Override + public void processResult(int rc, String path, Object ctx, String name) { + pending.decrementAndGet(); + counter.incrementAndGet(); + if (rc != 0) { + errors.incrementAndGet(); + } + if(counter.get() == 16200){ + sem.release(); + } + } + }, null); + pending.incrementAndGet(); + } + } + + // Wait until all updates return + if(!sem.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) { + LOG.warn("Did not aquire semaphore fast enough"); + } + mytestfooThread.join(ClientBase.CONNECTION_TIMEOUT); + if (mytestfooThread.isAlive()) { + LOG.error("mytestfooThread is still alive"); + } + assertTrue(waitForPendingRequests(60)); + assertTrue(waitForSync(qu, index, 10)); + + verifyState(qu, index, leader); + + zk1.close(); + zk2.close(); + zk3.close(); + + qu.shutdownAll(); + } + + /** + * This test: + * Starts up 3 ZKs. The non-leader ZKs are writing to cluster + * Shut down one of the non-leader ZKs. + * Restart after sessions have expired but <500 txns have taken place (get a diff) + * Shut down immediately after restarting, start running separate thread with other transactions + * Restart to a diff while transactions are running in leader + * + * + * Before fixes for ZOOKEEPER-962, restarting off of diff could get an inconsistent view of data missing transactions that + * completed during diff syncing. Follower would also be considered "restarted" before all forwarded transactions + * were completely processed, so restarting would cause a snap file with a too-high zxid to be written, and transactions + * would be missed + * + * This test should pretty reliably catch the failure of restarting the server before all diff messages have been processed, + * however, due to the transient nature of the system it may not catch failures due to concurrent processing of transactions + * during the leader's diff forwarding. + * + * @throws IOException + * @throws InterruptedException + * @throws KeeperException + * @throws Throwable + */ + + @Test + public void testResyncByDiffAfterFollowerCrashes() + throws IOException, InterruptedException, KeeperException, Throwable + { + final Semaphore sem = new Semaphore(0); + + QuorumUtil qu = new QuorumUtil(1); + qu.startAll(); + CountdownWatcher watcher1 = new CountdownWatcher(); + CountdownWatcher watcher2 = new CountdownWatcher(); + CountdownWatcher watcher3 = new CountdownWatcher(); + + int index = 1; + while(qu.getPeer(index).peer.leader == null) { + index++; + } + + Leader leader = qu.getPeer(index).peer.leader; + assertNotNull(leader); + + /* Reusing the index variable to select a follower to connect to */ + index = (index == 1) ? 2 : 1; + LOG.info("Connecting to follower: {}", index); + + final ZooKeeper zk1 = + createClient(qu.getPeer(index).peer.getClientPort(), watcher1); + LOG.info("zk1 has session id 0x{}", Long.toHexString(zk1.getSessionId())); + + final ZooKeeper zk2 = + createClient(qu.getPeer(index).peer.getClientPort(), watcher2); + LOG.info("zk2 has session id 0x{}", Long.toHexString(zk2.getSessionId())); + + final ZooKeeper zk3 = + createClient(qu.getPeer(3).peer.getClientPort(), watcher3); + LOG.info("zk3 has session id 0x{}", Long.toHexString(zk3.getSessionId())); + + zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); + + final AtomicBoolean runNow = new AtomicBoolean(false); + Thread mytestfooThread = new Thread(new Runnable() { + + @Override + public void run() { + int inSyncCounter = 0; + while(inSyncCounter < 400) { + if(runNow.get()) { + zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, String name) { + pending.decrementAndGet(); + counter.incrementAndGet(); + if (rc != 0) { + errors.incrementAndGet();; + } + if(counter.get() > 7300){ + sem.release(); + } + } + }, null); + pending.incrementAndGet(); + try { + Thread.sleep(10); + } catch (Exception e) { + } + inSyncCounter++; + } else { + Thread.yield(); + } + } + + } + }); + + mytestfooThread.start(); + for(int i = 0; i < 5000; i++) { + zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, String name) { + pending.decrementAndGet(); + counter.incrementAndGet(); + if (rc != 0) { + errors.incrementAndGet();; + } + if(counter.get() > 7300){ + sem.release(); + } + } + }, null); + pending.incrementAndGet(); + if(i == 1000){ + qu.shutdown(index); + Thread.sleep(1100); + LOG.info("Shutting down s1"); + } + if(i == 1100 || i == 1150 || i == 1200) { + Thread.sleep(1000); + } + + if(i == 1200){ + qu.startThenShutdown(index); + runNow.set(true); + qu.restart(index); + LOG.info("Setting up server: {}", index); + } + + if(i>=1000 && i%2== 0) { + zk3.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() { + + @Override + public void processResult(int rc, String path, Object ctx, String name) { + pending.decrementAndGet(); + counter.incrementAndGet(); + if (rc != 0) { + errors.incrementAndGet(); + } + if(counter.get() > 7300){ + sem.release(); + } + } + }, null); + pending.incrementAndGet(); + } + if(i == 1050 || i == 1100 || i == 1150) { + Thread.sleep(1000); + } + } + + // Wait until all updates return + if(!sem.tryAcquire(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) { + LOG.warn("Did not aquire semaphore fast enough"); + } + mytestfooThread.join(ClientBase.CONNECTION_TIMEOUT); + if (mytestfooThread.isAlive()) { + LOG.error("mytestfooThread is still alive"); + } + + assertTrue(waitForPendingRequests(60)); + assertTrue(waitForSync(qu, index, 10)); + // Verify that server is following and has the same epoch as the leader + + verifyState(qu, index, leader); + + zk1.close(); + zk2.close(); + zk3.close(); + + qu.shutdownAll(); + } + + private static DisconnectableZooKeeper createClient(int port, + CountdownWatcher watcher) + throws IOException, TimeoutException, InterruptedException + { + DisconnectableZooKeeper zk = new DisconnectableZooKeeper( + "127.0.0.1:" + port, ClientBase.CONNECTION_TIMEOUT, watcher); + + watcher.waitForConnected(CONNECTION_TIMEOUT); + return zk; + } + + /** + * Wait for all async operation to return. So we know that we can start + * verifying the state + */ + private boolean waitForPendingRequests(int timeout) throws InterruptedException { + LOG.info("Wait for pending requests: {}", pending.get()); + for (int i = 0; i < timeout; ++i) { + Thread.sleep(1000); + if (pending.get() == 0) { + return true; + } + } + LOG.info("Timeout waiting for pending requests: {}", pending.get()); + return false; + } + + /** + * Wait for all server to have the same lastProccessedZxid. Timeout in seconds + */ + private boolean waitForSync(QuorumUtil qu, int index, int timeout) throws InterruptedException{ + LOG.info("Wait for server to sync"); + int leaderIndex = (index == 1) ? 2 : 1; + ZKDatabase restartedDb = qu.getPeer(index).peer.getActiveServer().getZKDatabase(); + ZKDatabase cleanDb = qu.getPeer(3).peer.getActiveServer().getZKDatabase(); + ZKDatabase leadDb = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase(); + long leadZxid = 0; + long cleanZxid = 0; + long restartedZxid = 0; + for (int i = 0; i < timeout; ++i) { + leadZxid = leadDb.getDataTreeLastProcessedZxid(); + cleanZxid = cleanDb.getDataTreeLastProcessedZxid(); + restartedZxid = restartedDb.getDataTreeLastProcessedZxid(); + if (leadZxid == cleanZxid && leadZxid == restartedZxid) { + return true; + } + Thread.sleep(1000); + } + LOG.info("Timeout waiting for zxid to sync: leader 0x{}" + + "clean 0x{}" + + "restarted 0x{}", Long.toHexString(leadZxid), Long.toHexString(cleanZxid), + Long.toHexString(restartedZxid)); + return false; + } + + private static TestableZooKeeper createTestableClient(String hp) + throws IOException, TimeoutException, InterruptedException + { + CountdownWatcher watcher = new CountdownWatcher(); + return createTestableClient(watcher, hp); + } + + private static TestableZooKeeper createTestableClient( + CountdownWatcher watcher, String hp) + throws IOException, TimeoutException, InterruptedException + { + TestableZooKeeper zk = new TestableZooKeeper( + hp, ClientBase.CONNECTION_TIMEOUT, watcher); + + watcher.waitForConnected(CONNECTION_TIMEOUT); + return zk; + } + + private void verifyState(QuorumUtil qu, int index, Leader leader) { + LOG.info("Verifying state"); + assertTrue("Not following", qu.getPeer(index).peer.follower != null); + long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L); + long epochL = (leader.getEpoch() >> 32L); + assertTrue("Zxid: " + qu.getPeer(index).peer.getActiveServer().getZKDatabase().getDataTreeLastProcessedZxid() + + "Current epoch: " + epochF, epochF == epochL); + int leaderIndex = (index == 1) ? 2 : 1; + Collection sessionsRestarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase().getSessions(); + Collection sessionsNotRestarted = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase().getSessions(); + + for(Long l : sessionsRestarted) { + assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessionsNotRestarted.contains(l)); + } + assertEquals("Should have same number of sessions", sessionsNotRestarted.size(), sessionsRestarted.size()); + ZKDatabase restarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase(); + ZKDatabase clean = qu.getPeer(3).peer.getActiveServer().getZKDatabase(); + ZKDatabase lead = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase(); + for(Long l : sessionsRestarted) { + LOG.info("Validating ephemeral for session id 0x{}", Long.toHexString(l)); + assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessionsNotRestarted.contains(l)); + Set ephemerals = restarted.getEphemerals(l); + Set cleanEphemerals = clean.getEphemerals(l); + for(String o : cleanEphemerals) { + if(!ephemerals.contains(o)) { + LOG.info("Restarted follower doesn't contain ephemeral {} zxid 0x{}", + o, Long.toHexString(clean.getDataTree().getNode(o).stat.getMzxid())); + } + } + for(String o : ephemerals) { + if(!cleanEphemerals.contains(o)) { + LOG.info("Restarted follower has extra ephemeral {} zxid 0x{}", + o, Long.toHexString(restarted.getDataTree().getNode(o).stat.getMzxid())); + } + } + Set leadEphemerals = lead.getEphemerals(l); + for(String o : leadEphemerals) { + if(!cleanEphemerals.contains(o)) { + LOG.info("Follower doesn't contain ephemeral from leader {} zxid 0x{}", + o, Long.toHexString(lead.getDataTree().getNode(o).stat.getMzxid())); + } + } + for(String o : cleanEphemerals) { + if(!leadEphemerals.contains(o)) { + LOG.info("Leader doesn't contain ephemeral from follower {} zxid 0x{}", + o, Long.toHexString(clean.getDataTree().getNode(o).stat.getMzxid())); + } + } + assertEquals("Should have same number of ephemerals in both followers", ephemerals.size(), cleanEphemerals.size()); + assertEquals("Leader should equal follower", lead.getEphemerals(l).size(), cleanEphemerals.size()); + } + } + + /** + * Verify that the server is sending the proper zxid. See ZOOKEEPER-1412. + */ + @Test + public void testFollowerSendsLastZxid() throws Exception { + QuorumUtil qu = new QuorumUtil(1); + qu.startAll(); + + int index = 1; + while(qu.getPeer(index).peer.follower == null) { + index++; + } + LOG.info("Connecting to follower: {}", index); + + TestableZooKeeper zk = + createTestableClient("localhost:" + qu.getPeer(index).peer.getClientPort()); + + assertEquals(0L, zk.testableLastZxid()); + zk.exists("/", false); + long lzxid = zk.testableLastZxid(); + assertTrue("lzxid:" + lzxid + " > 0", lzxid > 0); + zk.close(); + qu.shutdownAll(); + } + + private class MyWatcher extends CountdownWatcher { + LinkedBlockingQueue events = + new LinkedBlockingQueue(); + + public void process(WatchedEvent event) { + super.process(event); + if (event.getType() != Event.EventType.None) { + try { + events.put(event); + } catch (InterruptedException e) { + LOG.warn("ignoring interrupt during event.put"); + } + } + } + } + + /** + * Verify that the server is sending the proper zxid, and as a result + * the watch doesn't fire. See ZOOKEEPER-1412. + */ + @Test + public void testFollowerWatcherResync() throws Exception { + QuorumUtil qu = new QuorumUtil(1); + qu.startAll(); + + int index = 1; + while(qu.getPeer(index).peer.follower == null) { + index++; + } + LOG.info("Connecting to follower: {}", index); + + TestableZooKeeper zk1 = createTestableClient( + "localhost:" + qu.getPeer(index).peer.getClientPort()); + zk1.create("/foo", "foo".getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + MyWatcher watcher = new MyWatcher(); + TestableZooKeeper zk2 = createTestableClient(watcher, + "localhost:" + qu.getPeer(index).peer.getClientPort()); + + zk2.exists("/foo", true); + + watcher.reset(); + zk2.testableConnloss(); + if (!watcher.clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) + { + fail("Unable to connect to server"); + } + assertArrayEquals("foo".getBytes(), zk2.getData("/foo", false, null)); + + assertNull(watcher.events.poll(5, TimeUnit.SECONDS)); + + zk1.close(); + zk2.close(); + qu.shutdownAll(); + } + +}