hadoop-zookeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maha...@apache.org
Subject svn commit: r882744 - in /hadoop/zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/
Date Fri, 20 Nov 2009 22:28:41 GMT
Author: mahadev
Date: Fri Nov 20 22:28:38 2009
New Revision: 882744

URL: http://svn.apache.org/viewvc?rev=882744&view=rev
Log:
ZOOKEEPER-582. ZooKeeper can revert to old data when a snapshot is created outside of normal
processing (ben reed and mahadev via mahadev)

Added:
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumZxidSyncTest.java
Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=882744&r1=882743&r2=882744&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Fri Nov 20 22:28:38 2009
@@ -130,6 +130,9 @@
   ZOOKEEPER-576. docs need to be updated for session moved exception and how
   to handle it (breed via phunt)
 
+  ZOOKEEPER-582. ZooKeeper can revert to old data when a snapshot is created
+  outside of normal processing (ben reed and mahadev via mahadev)
+
 IMPROVEMENTS:
   ZOOKEEPER-473. cleanup junit tests to eliminate false positives due to
   "socket reuse" and failure to close client (phunt via mahadev)

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=882744&r1=882743&r2=882744&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
Fri Nov 20 22:28:38 2009
@@ -64,6 +64,14 @@
             try {
                 connectToLeader(addr);
                 long newLeaderZxid = registerWithLeader(Leader.FOLLOWERINFO);
+                //check to see if the leader zxid is lower than ours
+                //this should never happen but is just a safety check
+                long lastLoggedZxid = self.getLastLoggedZxid();
+                if ((newLeaderZxid >> 32L) < (lastLoggedZxid >> 32L)) {
+                    LOG.fatal("Leader epoch " + Long.toHexString(newLeaderZxid >> 32L)
+                            + " is less than our epoch " + Long.toHexString(lastLoggedZxid
>> 32L));
+                    throw new IOException("Error: Epoch of leader is lower");
+                }
                 syncWithLeader(newLeaderZxid);                
                 QuorumPacket qp = new QuorumPacket();
                 while (self.running) {

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=882744&r1=882743&r2=882744&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
(original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
Fri Nov 20 22:28:38 2009
@@ -36,6 +36,7 @@
 import org.apache.zookeeper.server.NIOServerCnxn;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.Util;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 
@@ -431,9 +432,35 @@
                 new NIOServerCnxn.Factory(clientPort), quorumConfig);
     }
     
-    public long getLastLoggedZxid(){
-        return logFactory.getLastLoggedZxid();
+    /**
+     * returns the highest zxid that this host has seen
+     * 
+     * @return the highest zxid for this host
+     */
+    public long getLastLoggedZxid() {
+        /*
+         * it is possible to have the last zxid with just a snapshot and no log
+         * related to it. one example is during upgrade wherein the there is no
+         * corresponding log to the snapshot. in that case just use the snapshot
+         * zxid
+         */
+
+        File lastSnapshot = null;
+        long maxZxid = -1L;
+        long maxLogZxid = logFactory.getLastLoggedZxid();
+        try {
+            lastSnapshot = logFactory.findMostRecentSnapshot();
+            if (lastSnapshot != null) {
+                maxZxid = Math.max(Util.getZxidFromName(lastSnapshot.getName(),
+                        "snapshot"), maxLogZxid);
+            }
+        } catch (IOException ie) {
+            LOG.warn("Exception finding last snapshot ", ie);
+            maxZxid = maxLogZxid;
+        }
+        return maxZxid;
     }
+    
     public Follower follower;
     public Leader leader;
     public Observer observer;

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=882744&r1=882743&r2=882744&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Fri Nov
20 22:28:38 2009
@@ -181,7 +181,12 @@
     {
         String split[] = hp.split(":");
         String host = split[0];
-        int port = Integer.parseInt(split[1]);
+        int port;
+        try {
+            port = Integer.parseInt(split[1]);
+        } catch(RuntimeException e) {
+            throw new RuntimeException("Problem parsing " + hp + e.toString());
+        }
 
         Socket sock = new Socket(host, port);
         BufferedReader reader = null;
@@ -211,6 +216,8 @@
         long start = System.currentTimeMillis();
         while (true) {
             try {
+                // if there are multiple hostports, just take the first one
+                hp = hp.split(",")[0];
                 String result = send4LetterWord(hp, "stat");
                 if (result.startsWith("Zookeeper version:")) {
                     return true;

Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java?rev=882744&r1=882743&r2=882744&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java Fri Nov
20 22:28:38 2009
@@ -146,7 +146,7 @@
         s5.start();
         LOG.info("started QuorumPeer 5");
 
-        LOG.info ("Closing ports " + hostPort);
+        LOG.info ("Checking ports " + hostPort);
         for (String hp : hostPort.split(",")) {
             assertTrue("waiting for server up",
                        ClientBase.waitForServerUp(hp,
@@ -176,6 +176,33 @@
         }
         JMXEnv.ensureAll(ensureNames.toArray(new String[ensureNames.size()]));
     }
+    public void setupServers() throws IOException {
+        int tickTime = 2000;
+        int initLimit = 3;
+        int syncLimit = 3;
+        HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
+        peers.put(Long.valueOf(1), new QuorumServer(1, new InetSocketAddress("127.0.0.1",
port1 + 1000)));
+        peers.put(Long.valueOf(2), new QuorumServer(2, new InetSocketAddress("127.0.0.1",
port2 + 1000)));
+        peers.put(Long.valueOf(3), new QuorumServer(3, new InetSocketAddress("127.0.0.1",
port3 + 1000)));
+        peers.put(Long.valueOf(4), new QuorumServer(4, new InetSocketAddress("127.0.0.1",
port4 + 1000)));
+        peers.put(Long.valueOf(5), new QuorumServer(5, new InetSocketAddress("127.0.0.1",
port5 + 1000)));
+
+        LOG.info("creating QuorumPeer 1 port " + port1);
+        s1 = new QuorumPeer(peers, s1dir, s1dir, port1, 0, 1, tickTime, initLimit, syncLimit);
+        assertEquals(port1, s1.getClientPort());
+        LOG.info("creating QuorumPeer 2 port " + port2);
+        s2 = new QuorumPeer(peers, s2dir, s2dir, port2, 0, 2, tickTime, initLimit, syncLimit);
+        assertEquals(port2, s2.getClientPort());
+        LOG.info("creating QuorumPeer 3 port " + port3);
+        s3 = new QuorumPeer(peers, s3dir, s3dir, port3, 0, 3, tickTime, initLimit, syncLimit);
+        assertEquals(port3, s3.getClientPort());
+        LOG.info("creating QuorumPeer 4 port " + port4);
+        s4 = new QuorumPeer(peers, s4dir, s4dir, port4, 0, 4, tickTime, initLimit, syncLimit);
+        assertEquals(port4, s4.getClientPort());
+        LOG.info("creating QuorumPeer 5 port " + port5);
+        s5 = new QuorumPeer(peers, s5dir, s5dir, port5, 0, 5, tickTime, initLimit, syncLimit);
+        assertEquals(port5, s5.getClientPort());
+    }
 
     @After
     @Override
@@ -191,11 +218,7 @@
                     + unixos.getOpenFileDescriptorCount());
         }
 
-        shutdown(s1);
-        shutdown(s2);
-        shutdown(s3);
-        shutdown(s4);
-        shutdown(s5);
+        shutdownServers();
 
         for (String hp : hostPort.split(",")) {
             assertTrue("waiting for server down",
@@ -208,6 +231,13 @@
 
         LOG.info("FINISHED " + getName());
     }
+    public void shutdownServers() {
+        shutdown(s1);
+        shutdown(s2);
+        shutdown(s3);
+        shutdown(s4);
+        shutdown(s5);
+    }
 
     protected void shutdown(QuorumPeer qp) {
         try {

Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumZxidSyncTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumZxidSyncTest.java?rev=882744&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumZxidSyncTest.java
(added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumZxidSyncTest.java
Fri Nov 20 22:28:38 2009
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+
+import junit.framework.TestCase;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test what happens when logs fall behind the snapshots or a follower has a
+ * higher epoch than a leader.
+ */
+public class QuorumZxidSyncTest extends TestCase {
+    QuorumBase qb = new QuorumBase();
+    
+    @Before
+    @Override
+    protected void setUp() throws Exception {
+        qb.setUp();
+    }
+    
+    @Test
+    /**
+     * find out what happens when a follower connects to leader that is behind
+     */
+    public void testBehindLeader() throws Exception {
+        // crank up the epoch numbers
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        ZooKeeper zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        zk.create("/0", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.close();
+        qb.shutdownServers();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        qb.shutdownServers();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        zk.create("/1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.close();
+        qb.shutdownServers();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        qb.shutdownServers();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        zk.create("/2", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.close();
+        qb.shutdownServers();
+        deleteFiles(qb.s1dir);
+        deleteFiles(qb.s2dir);
+        deleteFiles(qb.s3dir);
+        deleteFiles(qb.s4dir);
+        qb.setupServers();
+        qb.s1.start();
+        qb.s2.start();
+        qb.s3.start();
+        qb.s4.start();
+        assertTrue("Servers didn't come up", ClientBase.waitForServerUp(qb.hostPort, 10000));
+        qb.s5.start();
+        String hostPort = "127.0.0.1:" + qb.s5.getClientPort();
+        assertFalse("Servers came up, but shouldn't have since it's ahead of leader",
+                ClientBase.waitForServerUp(hostPort, 10000));
+    }
+    
+    private void deleteFiles(File f) {
+        File v = new File(f, "version-2");
+        for(File c: v.listFiles()) {
+            c.delete();
+        }
+    }
+    
+    @Test
+    /**
+     * find out what happens when the latest state is in the snapshots not
+     * the logs.
+     */
+    public void testLateLogs() throws Exception {
+        // crank up the epoch numbers
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        ZooKeeper zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        zk.create("/0", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.close();
+        qb.shutdownServers();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        qb.shutdownServers();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        zk.create("/1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.close();
+        qb.shutdownServers();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        qb.shutdownServers();
+        deleteLogs(qb.s1dir);
+        deleteLogs(qb.s2dir);
+        deleteLogs(qb.s3dir);
+        deleteLogs(qb.s4dir);
+        deleteLogs(qb.s5dir);
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        zk.create("/2", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.close();
+        qb.shutdownServers();
+        qb.startServers();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+        zk = new ZooKeeper(qb.hostPort, 10000, new Watcher() {
+            public void process(WatchedEvent event) {
+            }});
+        boolean saw2 = false;
+        for(String child: zk.getChildren("/", false)) {
+            if (child.equals("2")) {
+                saw2 = true;
+            }
+        }
+        zk.close();
+        assertTrue("Didn't see /2 (went back in time)", saw2);
+    }
+    
+    private void deleteLogs(File f) {
+        File v = new File(f, "version-2");
+        for(File c: v.listFiles()) {
+            if (c.getName().startsWith("log")) {
+                c.delete();
+            }
+        }
+    }
+    
+    @After
+    @Override
+    public void tearDown() throws Exception {
+        qb.tearDown();
+    }
+}



Mime
View raw message