Return-Path: X-Original-To: apmail-zookeeper-commits-archive@www.apache.org Delivered-To: apmail-zookeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5BFD3D1C6 for ; Tue, 17 Jul 2012 21:25:39 +0000 (UTC) Received: (qmail 50353 invoked by uid 500); 17 Jul 2012 21:25:39 -0000 Delivered-To: apmail-zookeeper-commits-archive@zookeeper.apache.org Received: (qmail 50321 invoked by uid 500); 17 Jul 2012 21:25:39 -0000 Mailing-List: contact commits-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ Delivered-To: mailing list commits@zookeeper.apache.org Received: (qmail 50310 invoked by uid 99); 17 Jul 2012 21:25:39 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jul 2012 21:25:39 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jul 2012 21:25:33 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 814AF2388962 for ; Tue, 17 Jul 2012 21:25:12 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1362660 - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/persistence/ src/java/test/org/apache/zookeeper/server/ src/java/test/org/apache/zookeeper/server/quorum/ src/java/test/... Date: Tue, 17 Jul 2012 21:25:12 -0000 To: commits@zookeeper.apache.org From: phunt@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120717212512.814AF2388962@eris.apache.org> Author: phunt Date: Tue Jul 17 21:25:11 2012 New Revision: 1362660 URL: http://svn.apache.org/viewvc?rev=1362660&view=rev Log: ZOOKEEPER-1489. Data loss after truncate on transaction log (phunt) Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/TruncateCorruptionTest.java zookeeper/trunk/src/java/test/org/apache/zookeeper/server/util/PortForwarder.java Modified: zookeeper/trunk/CHANGES.txt zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java zookeeper/trunk/src/java/test/org/apache/zookeeper/test/TruncateTest.java Modified: zookeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1362660&r1=1362659&r2=1362660&view=diff ============================================================================== --- zookeeper/trunk/CHANGES.txt (original) +++ zookeeper/trunk/CHANGES.txt Tue Jul 17 21:25:11 2012 @@ -209,6 +209,8 @@ BUGFIXES: ZOOKEEPER-1427. Writing to local files is done non-atomically (phunt) + ZOOKEEPER-1489. Data loss after truncate on transaction log (phunt) + IMPROVEMENTS: ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports, Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java?rev=1362660&r1=1362659&r2=1362660&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java Tue Jul 17 21:25:11 2012 @@ -414,16 +414,23 @@ public class ZKDatabase { } /** - * truncate the zkdatabase to this zxid + * Truncate the ZKDatabase to the specified zxid * @param zxid the zxid to truncate zk database to - * @return true if the truncate is succesful and false if not + * @return true if the truncate is successful and false if not * @throws IOException */ public boolean truncateLog(long zxid) throws IOException { clear(); - boolean truncated = this.snapLog.truncateLog(zxid); + + // truncate the log + boolean truncated = snapLog.truncateLog(zxid); + + if (!truncated) { + return false; + } + loadDataBase(); - return truncated; + return true; } /** Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java?rev=1362660&r1=1362659&r2=1362660&view=diff ============================================================================== --- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java (original) +++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java Tue Jul 17 21:25:11 2012 @@ -47,12 +47,12 @@ import org.slf4j.LoggerFactory; public class FileTxnSnapLog { //the direcotry containing the //the transaction logs - File dataDir; + private final File dataDir; //the directory containing the //the snapshot directory - File snapDir; - TxnLog txnLog; - SnapShot snapLog; + private final File snapDir; + private TxnLog txnLog; + private SnapShot snapLog; public final static int VERSION = 2; public final static String version = "version-"; @@ -81,6 +81,8 @@ public class FileTxnSnapLog { * @param snapDir the snapshot directory */ public FileTxnSnapLog(File dataDir, File snapDir) throws IOException { + LOG.debug("Opening datadir:{} snapDir:{}", dataDir, snapDir); + this.dataDir = new File(dataDir, version + VERSION); this.snapDir = new File(snapDir, version + VERSION); @@ -279,8 +281,22 @@ public class FileTxnSnapLog { * @throws IOException */ public boolean truncateLog(long zxid) throws IOException { - FileTxnLog txnLog = new FileTxnLog(dataDir); - return txnLog.truncate(zxid); + // close the existing txnLog and snapLog + close(); + + // truncate it + FileTxnLog truncLog = new FileTxnLog(dataDir); + boolean truncated = truncLog.truncate(zxid); + truncLog.close(); + + // re-open the txnLog and snapLog + // I'd rather just close/reopen this object itself, however that + // would have a big impact outside ZKDatabase as there are other + // objects holding a reference to this object. + txnLog = new FileTxnLog(dataDir); + snapLog = new FileSnap(snapDir); + + return truncated; } /** Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/TruncateCorruptionTest.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/TruncateCorruptionTest.java?rev=1362660&view=auto ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/TruncateCorruptionTest.java (added) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/TruncateCorruptionTest.java Tue Jul 17 21:25:11 2012 @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZKTestCase; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooKeeper.States; +import org.apache.zookeeper.client.FourLetterWordMain; +import org.apache.zookeeper.server.quorum.QuorumPeerTestBase.MainThread; +import org.apache.zookeeper.server.util.PortForwarder; +import org.apache.zookeeper.test.ClientBase; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Verify ZOOKEEPER-1489 - cause truncation followed by continued append to the + * snaplog, verify that the newly appended information (after the truncation) is + * readable. + */ +public class TruncateCorruptionTest extends ZKTestCase { + + private static final Logger LOG = LoggerFactory + .getLogger(TruncateCorruptionTest.class); + + public interface Check { + boolean doCheck(); + } + + public static boolean await(Check check, long timeoutMillis) + throws InterruptedException { + long end = System.currentTimeMillis() + timeoutMillis; + while (end > System.currentTimeMillis()) { + if (check.doCheck()) { + LOG.debug("await succeeded after " + + (System.currentTimeMillis() - end + timeoutMillis)); + return true; + } + Thread.sleep(50); + } + LOG.debug("await failed in {}", timeoutMillis); + return false; + } + + @Test + public void testTransactionLogCorruption() throws Exception { + // configure the ports for that test in a way so that we can disrupt the + // connection for wrapper1 + ZookeeperServerWrapper wrapper1 = new ZookeeperServerWrapper(1, 7000); + ZookeeperServerWrapper wrapper2 = new ZookeeperServerWrapper(2, 8000); + ZookeeperServerWrapper wrapper3 = new ZookeeperServerWrapper(3, 8000); + + wrapper2.start(); + wrapper3.start(); + + try { + wrapper2.await(ClientBase.CONNECTION_TIMEOUT); + wrapper3.await(ClientBase.CONNECTION_TIMEOUT); + } catch (Exception e) { + ClientBase.logAllStackTraces(); + throw e; + } + List pfs = startForwarding(); + Thread.sleep(1000); + wrapper1.start(); + wrapper1.await(ClientBase.CONNECTION_TIMEOUT); + + final ZooKeeper zk1 = new ZooKeeper("localhost:8201", + ClientBase.CONNECTION_TIMEOUT, new ZkWatcher("zk1")); + waitForConnection(zk1); + zk1.create("/test", "testdata".getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + // wait a little until stuff is synced in between servers + Thread.sleep(1000); + wrapper2.stop(); + // wait for reconnect + waitForConnection(zk1); + zk1.create("/test2", "testdata".getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + // now we stop them to force a situation where a TRUNC event is sent to + // the followers + wrapper3.stop(); + // simulate a short interruption in network in between + + stopForwarding(pfs); + LOG.info("interrupted network connection ... waiting for zk1 and zk2 to realize"); + + Assert.assertTrue(await(new Check() { + + public boolean doCheck() { + if (zk1.getState() == States.CONNECTING) { + List children; + try { + children = zk1.getChildren("/", false); + + return children.size() != 0; + } catch (KeeperException.ConnectionLossException e) { + // just to be sure + return true; + } catch (Exception e) { + // silently fail + } + } + return false; + } + }, TimeUnit.MINUTES.toMillis(2))); + + // let's clean the data dir of zk3 so that an ensemble of 2 and 3 is + // less advanced than 1 (just to force an event where we get a TRUNCATE + // message) + wrapper3.clean(); + wrapper2.start(); + wrapper3.start(); + LOG.info("Waiting for zk2 and zk3 to form a quorum"); + + wrapper2.await(ClientBase.CONNECTION_TIMEOUT); + wrapper3.await(ClientBase.CONNECTION_TIMEOUT); + ZooKeeper zk2 = new ZooKeeper("localhost:8202", + ClientBase.CONNECTION_TIMEOUT, new ZkWatcher("zk2")); + waitForConnection(zk2); + + LOG.info("re-establishing network connection and waiting for zk1 to reconnect"); + pfs = startForwarding(); + waitForConnection(zk1); + + // create more data ... + LOG.info("Creating node test3"); + zk1.create("/test3", "testdata".getBytes(), Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + Thread.sleep(250); + LOG.info("List of children at zk2 before zk1 became master"); + List children2 = zk2.getChildren("/", false); + LOG.info(children2.toString()); + + LOG.info("List of children at zk1 before zk1 became master"); + List children1 = zk1.getChildren("/", false); + LOG.info(children1.toString()); + + // now cause zk1 to become master and test3 will be lost + LOG.info("restarting zk2 and zk3 while cleaning zk3 to enforce zk1 to become master"); + wrapper2.stop(); + wrapper3.stop(); + wrapper3.clean(); + wrapper3.start(); + wrapper3.await(TimeUnit.MINUTES.toMillis(2)); + ZooKeeper zk3 = new ZooKeeper("localhost:8203", + ClientBase.CONNECTION_TIMEOUT, new ZkWatcher("zk3")); + waitForConnection(zk3); + LOG.info("Zk1 and zk3 have a quorum, now starting zk2"); + wrapper2.start(); + waitForConnection(zk2); + LOG.info("List of children at zk2"); + children2 = zk2.getChildren("/", false); + LOG.info(children2.toString()); + + waitForConnection(zk1); + LOG.info("List of children at zk1"); + children1 = zk1.getChildren("/", false); + Assert.assertTrue("test3 node is missing on zk1", + children1.contains("test3")); + Assert.assertTrue("test3 node is missing on zk2", + children2.contains("test3")); + Assert.assertEquals(children1, children2); + stopForwarding(pfs); + } + + /** + * @param pfs + * @throws Exception + */ + private void stopForwarding(List pfs) throws Exception { + for (PortForwarder pf : pfs) { + pf.shutdown(); + } + } + + /** + * @return + * @throws IOException + */ + private List startForwarding() throws IOException { + List res = new ArrayList(); + res.add(new PortForwarder(8301, 7301)); + res.add(new PortForwarder(8401, 7401)); + res.add(new PortForwarder(7302, 8302)); + res.add(new PortForwarder(7402, 8402)); + res.add(new PortForwarder(7303, 8303)); + res.add(new PortForwarder(7403, 8403)); + return res; + } + + /** + * @param zk + * @throws InterruptedException + */ + private void waitForConnection(final ZooKeeper zk) + throws InterruptedException { + Assert.assertTrue(await(new Check() { + + public boolean doCheck() { + if (zk.getState() == States.CONNECTED) { + List children; + try { + children = zk.getChildren("/", false); + + return children.size() != 0; + } catch (Exception e) { + // silently fail + } + } + return false; + } + }, TimeUnit.MINUTES.toMillis(2))); + } + + static class ZkWatcher implements Watcher { + + private final String clientId; + + ZkWatcher(String clientId) { + this.clientId = clientId; + } + + public void process(WatchedEvent event) { + LOG.info("<<>> " + clientId + " - WatchedEvent: " + + event); + } + } + + public static class ZookeeperServerWrapper { + + private static final Logger LOG = LoggerFactory + .getLogger(ZookeeperServerWrapper.class); + + private final MainThread server; + private final int clientPort; + + public ZookeeperServerWrapper(int serverId, int portBase) + throws IOException { + clientPort = 8200 + serverId; + + // start client port on 8200 + serverId + // start servers on portbase + 300 or + 400 (+serverId) + String quorumCfgSection = "server.1=127.0.0.1:" + (portBase + 301) + + ":" + (portBase + 401) + ";" + (8200 + 1) + + "\nserver.2=127.0.0.1:" + (portBase + 302) + ":" + + (portBase + 402) + ";" + (8200 + 2) + + "\nserver.3=127.0.0.1:" + (portBase + 303) + ":" + + (portBase + 403) + ";" + (8200 + 3); + + server = new MainThread(serverId, clientPort, quorumCfgSection); + } + + public void start() throws Exception { + server.start(); + } + + public void await(long timeout) throws Exception { + long deadline = System.currentTimeMillis() + timeout; + String result = "?"; + while (deadline > System.currentTimeMillis()) { + try { + result = FourLetterWordMain.send4LetterWord("127.0.0.1", + clientPort, "stat"); + if (result.startsWith("Zookeeper version:")) { + LOG.info("Started zookeeper server on port " + + clientPort); + return; + } + } catch (IOException e) { + // ignore as this is expected + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // ignore + } + } + LOG.info(result); + throw new Exception("Failed to connect to zookeeper server"); + } + + public void stop() { + try { + server.shutdown(); + } catch (InterruptedException e) { + LOG.info("Interrupted while shutting down"); + } + } + + public void clean() throws IOException { + server.clean(); + } + } +} Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java?rev=1362660&r1=1362659&r2=1362660&view=diff ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java (original) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java Tue Jul 17 21:25:11 2012 @@ -34,18 +34,18 @@ import org.apache.zookeeper.test.ClientB import org.apache.zookeeper.test.QuorumBase; /** - * Has some common functionality for tests that work with QuorumPeers. - * Override process(WatchedEvent) to implement the Watcher interface + * Has some common functionality for tests that work with QuorumPeers. Override + * process(WatchedEvent) to implement the Watcher interface */ public class QuorumPeerTestBase extends ZKTestCase implements Watcher { - protected static final Logger LOG = - LoggerFactory.getLogger(QuorumPeerTestBase.class); - + protected static final Logger LOG = LoggerFactory + .getLogger(QuorumPeerTestBase.class); + public void process(WatchedEvent event) { // ignore for this test } - public static class TestQPMain extends QuorumPeerMain { + public static class TestQPMain extends QuorumPeerMain { public void shutdown() { // ensure it closes - in particular wait for thread to exit if (quorumPeer != null) { @@ -60,13 +60,13 @@ public class QuorumPeerTestBase extends volatile TestQPMain main; public MainThread(int myid, int clientPort, String quorumCfgSection) - throws IOException - { + throws IOException { File tmpDir = ClientBase.createTmpDir(); - LOG.info("id = " + myid + " tmpDir = " + tmpDir); + LOG.info("id = " + myid + " tmpDir = " + tmpDir + " clientPort = " + + clientPort); confFile = new File(tmpDir, "zoo.cfg"); dynamicConfigFile = new File(tmpDir, "zoo.dynamic"); - + FileWriter fwriter = new FileWriter(confFile); fwriter.write("tickTime=4000\n"); fwriter.write("initLimit=10\n"); @@ -83,14 +83,15 @@ public class QuorumPeerTestBase extends String osname = java.lang.System.getProperty("os.name"); if (osname.toLowerCase().contains("windows")) { dir = dir.replace('\\', '/'); - dynamicConfigFilename = dynamicConfigFilename.replace('\\', '/'); + dynamicConfigFilename = dynamicConfigFilename + .replace('\\', '/'); } fwriter.write("dataDir=" + dir + "\n"); - + fwriter.write("clientPort=" + clientPort + "\n"); - + fwriter.write("dynamicConfigFile=" + dynamicConfigFilename + "\n"); - + fwriter.flush(); fwriter.close(); @@ -107,11 +108,13 @@ public class QuorumPeerTestBase extends } Thread currentThread; + synchronized public void start() { - main = new TestQPMain(); - currentThread = new Thread(this); - currentThread.start(); + main = new TestQPMain(); + currentThread = new Thread(this); + currentThread.start(); } + public void run() { String args[] = new String[1]; args[0] = confFile.toString(); @@ -121,26 +124,33 @@ public class QuorumPeerTestBase extends // test will still fail even though we just log/ignore LOG.error("unexpected exception in run", e); } finally { - currentThread = null; + currentThread = null; } } public void shutdown() throws InterruptedException { - Thread t = currentThread; - if (t != null && t.isAlive()) { - main.shutdown(); - t.join(500); - } - } - public void join(long timeout) throws InterruptedException { - Thread t = currentThread; - if (t != null) { - t.join(timeout); - } - } - public boolean isAlive() { - Thread t = currentThread; - return t != null && t.isAlive(); - } + Thread t = currentThread; + if (t != null && t.isAlive()) { + main.shutdown(); + t.join(500); + } + } + + public void join(long timeout) throws InterruptedException { + Thread t = currentThread; + if (t != null) { + t.join(timeout); + } + } + + public boolean isAlive() { + Thread t = currentThread; + return t != null && t.isAlive(); + } + + public void clean() { + ClientBase.recursiveDelete(main.quorumPeer.getTxnFactory() + .getDataDir()); + } } } Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/util/PortForwarder.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/util/PortForwarder.java?rev=1362660&view=auto ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/util/PortForwarder.java (added) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/util/PortForwarder.java Tue Jul 17 21:25:11 2012 @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + */ +package org.apache.zookeeper.server.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ConnectException; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PortForwarder extends Thread { + private static final Logger LOG = LoggerFactory + .getLogger(PortForwarder.class); + + private static class PortForwardWorker implements Runnable { + + private final InputStream in; + private final OutputStream out; + private final Socket toClose; + private final Socket toClose2; + + PortForwardWorker(Socket toClose, Socket toClose2, InputStream in, + OutputStream out) throws IOException { + this.toClose = toClose; + this.toClose2 = toClose2; + this.in = in; + this.out = out; + // LOG.info("starting forward for "+toClose); + } + + public void run() { + Thread.currentThread().setName(toClose.toString() + "-->" + + toClose2.toString()); + byte[] buf = new byte[1024]; + try { + while (true) { + try { + int read = this.in.read(buf); + if (read > 0) { + try { + this.out.write(buf, 0, read); + } catch (IOException e) { + LOG.warn("exception during write", e); + try { + toClose.close(); + } catch (IOException ex) { + // ignore + } + try { + toClose2.close(); + } catch (IOException ex) { + // ignore + } + break; + } + } + } catch (SocketTimeoutException e) { + LOG.error("socket timeout", e); + } + Thread.sleep(1); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted", e); + try { + toClose.close(); + } catch (IOException ex) { + // ignore + } + try { + toClose2.close(); + } catch (IOException ex) { + // ignore silently + } + } catch (SocketException e) { + if (!"Socket closed".equals(e.getMessage())) { + LOG.error("Unexpected exception", e); + } + } catch (IOException e) { + LOG.error("Unexpected exception", e); + } + LOG.info("Shutting down forward for " + toClose); + } + + } + + private volatile boolean stopped = false; + private ExecutorService workers = Executors.newCachedThreadPool(); + private ServerSocket serverSocket; + private final int to; + + public PortForwarder(int from, int to) throws IOException { + this.to = to; + serverSocket = new ServerSocket(from); + serverSocket.setSoTimeout(30000); + this.start(); + } + + @Override + public void run() { + try { + while (!stopped) { + Socket sock = null; + try { + LOG.info("accepting socket local:" + + serverSocket.getLocalPort() + " to:" + to); + sock = serverSocket.accept(); + LOG.info("accepted: local:" + sock.getLocalPort() + + " from:" + sock.getPort() + + " to:" + to); + Socket target = null; + int retry = 10; + while(sock.isConnected()) { + try { + target = new Socket("localhost", to); + break; + } catch (IOException e) { + if (retry == 0) { + throw e; + } + LOG.warn("connection failed, retrying(" + retry + + "): local:" + sock.getLocalPort() + + " from:" + sock.getPort() + + " to:" + to, e); + } + Thread.sleep(TimeUnit.SECONDS.toMillis(1)); + retry--; + } + LOG.info("connected: local:" + sock.getLocalPort() + + " from:" + sock.getPort() + + " to:" + to); + sock.setSoTimeout(30000); + target.setSoTimeout(30000); + this.workers.execute(new PortForwardWorker(sock, target, + sock.getInputStream(), target.getOutputStream())); + this.workers.execute(new PortForwardWorker(target, sock, + target.getInputStream(), sock.getOutputStream())); + } catch (SocketTimeoutException e) { + LOG.warn("socket timed out local:" + sock.getLocalPort() + + " from:" + sock.getPort() + + " to:" + to, e); + } catch (ConnectException e) { + LOG.warn("connection exception local:" + sock.getLocalPort() + + " from:" + sock.getPort() + + " to:" + to, e); + sock.close(); + } catch (IOException e) { + if (!"Socket closed".equals(e.getMessage())) { + LOG.warn("unexpected exception local:" + sock.getLocalPort() + + " from:" + sock.getPort() + + " to:" + to, e); + throw e; + } + } + + } + } catch (IOException e) { + LOG.error("Unexpected exception to:" + to, e); + } catch (InterruptedException e) { + LOG.error("Interrupted to:" + to, e); + } + } + + public void shutdown() throws Exception { + this.stopped = true; + this.serverSocket.close(); + this.workers.shutdownNow(); + try { + if (!this.workers.awaitTermination(5, TimeUnit.SECONDS)) { + throw new Exception( + "Failed to stop forwarding within 5 seconds"); + } + } catch (InterruptedException e) { + throw new Exception("Failed to stop forwarding"); + } + this.join(); + } +} Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/TruncateTest.java URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/TruncateTest.java?rev=1362660&r1=1362659&r2=1362660&view=diff ============================================================================== --- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/TruncateTest.java (original) +++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/TruncateTest.java Tue Jul 17 21:25:11 2012 @@ -23,8 +23,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.HashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.jute.Record; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; @@ -33,15 +32,23 @@ import org.apache.zookeeper.ZKTestCase; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.persistence.FileTxnLog; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator; import org.apache.zookeeper.server.quorum.QuorumPeer; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; +import org.apache.zookeeper.txn.SetDataTxn; +import org.apache.zookeeper.txn.TxnHeader; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TruncateTest extends ZKTestCase { private static final Logger LOG = LoggerFactory.getLogger(TruncateTest.class); @@ -69,12 +76,65 @@ public class TruncateTest extends ZKTest connected = event.getState() == Watcher.Event.KeeperState.SyncConnected; } }; - + + @Test + public void testTruncationStreamReset() throws Exception { + File tmpdir = ClientBase.createTmpDir(); + FileTxnSnapLog snaplog = new FileTxnSnapLog(tmpdir, tmpdir); + ZKDatabase zkdb = new ZKDatabase(snaplog); + + for (int i = 1; i <= 100; i++) { + append(zkdb, i); + } + + zkdb.truncateLog(1); + + append(zkdb, 200); + + zkdb.close(); + + // verify that the truncation and subsequent append were processed + // correctly + FileTxnLog txnlog = new FileTxnLog(new File(tmpdir, "version-2")); + TxnIterator iter = txnlog.read(1); + + TxnHeader hdr = iter.getHeader(); + Record txn = iter.getTxn(); + Assert.assertEquals(1, hdr.getZxid()); + Assert.assertTrue(txn instanceof SetDataTxn); + + iter.next(); + + hdr = iter.getHeader(); + txn = iter.getTxn(); + Assert.assertEquals(200, hdr.getZxid()); + Assert.assertTrue(txn instanceof SetDataTxn); + } + + private void append(ZKDatabase zkdb, int i) throws IOException { + TxnHeader hdr = new TxnHeader(1, 1, i, 1, ZooDefs.OpCode.setData); + Record txn = new SetDataTxn("/foo" + i, new byte[0], 1); + Request req = new Request(0, 0, 0, hdr, txn, 0); + + zkdb.append(req); + zkdb.commit(); + } + @Test public void testTruncate() throws IOException, InterruptedException, KeeperException { // Prime the server that is going to come in late with 50 txns - ServerCnxnFactory factory = ClientBase.createNewServerInstance(dataDir1, null, "127.0.0.1:" + baseHostPort, 100); - ZooKeeper zk = new ZooKeeper("127.0.0.1:" + baseHostPort, 15000, nullWatcher); + String hostPort = "127.0.0.1:" + baseHostPort; + ServerCnxnFactory factory = ClientBase.createNewServerInstance(dataDir1, null, hostPort, 100); + ClientBase.shutdownServerInstance(factory, hostPort); + + // standalone starts with 0 epoch while quorum starts with 1 + File origfile = new File(new File(dataDir1, "version-2"), "snapshot.0"); + File newfile = new File(new File(dataDir1, "version-2"), "snapshot.100000000"); + origfile.renameTo(newfile); + + factory = ClientBase.createNewServerInstance(dataDir1, null, hostPort, 100); + + ZooKeeper zk = new ZooKeeper(hostPort, 15000, nullWatcher); for(int i = 0; i < 50; i++) { zk.create("/" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } @@ -137,11 +197,13 @@ public class TruncateTest extends ZKTest } zk1.getData("/9", false, new Stat()); try { - // 10 wont work because the session expiration - // will match the zxid for 10 and so we wont - // actually truncate the zxid for 10 creation - // but for 11 we will for sure - zk1.getData("/11", false, new Stat()); + // /10 wont work because the session expiration + // will match the zxid for /10 and so we wont + // actually truncate the zxid for /10 creation + // due to an artifact of switching the xid of the standalone + // /11 is the last entry in the log for the xid + // as a result /12 is the first of the truncated znodes to check for + zk1.getData("/12", false, new Stat()); Assert.fail("Should have gotten an error"); } catch(KeeperException.NoNodeException e) { // this is what we want