Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 54AE6D993 for ; Wed, 15 Aug 2012 18:59:44 +0000 (UTC) Received: (qmail 23118 invoked by uid 500); 15 Aug 2012 18:59:44 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 23077 invoked by uid 500); 15 Aug 2012 18:59:44 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 23067 invoked by uid 99); 15 Aug 2012 18:59:44 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Aug 2012 18:59:44 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Aug 2012 18:59:38 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 5C14523888E4; Wed, 15 Aug 2012 18:58:53 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1373571 - in /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/qjournal/client/ src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/ src/main/java/org/apache/hadoop/hdfs/qjournal/... Date: Wed, 15 Aug 2012 18:58:52 -0000 To: hdfs-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120815185853.5C14523888E4@eris.apache.org> Author: todd Date: Wed Aug 15 18:58:51 2012 New Revision: 1373571 URL: http://svn.apache.org/viewvc?rev=1373571&view=rev Log: HDFS-3797. QJM: add segment txid as a parameter to journal() RPC. Contributed by Todd Lipcon. Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1373571&r1=1373570&r2=1373571&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt Wed Aug 15 18:58:51 2012 @@ -22,3 +22,5 @@ HDFS-3795. QJM: validate journal dir at HDFS-3798. Avoid throwing NPE when finalizeSegment() is called on invalid segment (todd) HDFS-3799. QJM: handle empty log segments during recovery (todd) + +HDFS-3797. QJM: add segment txid as a parameter to journal() RPC (todd) Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java?rev=1373571&r1=1373570&r2=1373571&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java Wed Aug 15 18:58:51 2012 @@ -53,12 +53,14 @@ interface AsyncLogger { /** * Send a batch of edits to the logger. + * @param segmentTxId the first txid in the current segment * @param firstTxnId the first txid of the edits. * @param numTxns the number of transactions in the batch * @param data the actual data to be sent */ public ListenableFuture sendEdits( - final long firstTxnId, final int numTxns, final byte[] data); + final long segmentTxId, final long firstTxnId, + final int numTxns, final byte[] data); /** * Begin writing a new log segment. Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java?rev=1373571&r1=1373570&r2=1373571&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java Wed Aug 15 18:58:51 2012 @@ -263,11 +263,11 @@ class AsyncLoggerSet { } public QuorumCall sendEdits( - long firstTxnId, int numTxns, byte[] data) { + long segmentTxId, long firstTxnId, int numTxns, byte[] data) { Map> calls = Maps.newHashMap(); for (AsyncLogger logger : loggers) { ListenableFuture future = - logger.sendEdits(firstTxnId, numTxns, data); + logger.sendEdits(segmentTxId, firstTxnId, numTxns, data); calls.put(logger, future); } return QuorumCall.create(calls); Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java?rev=1373571&r1=1373570&r2=1373571&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java Wed Aug 15 18:58:51 2012 @@ -235,7 +235,8 @@ public class IPCLoggerChannel implements @Override public ListenableFuture sendEdits( - final long firstTxnId, final int numTxns, final byte[] data) { + final long segmentTxId, final long firstTxnId, + final int numTxns, final byte[] data) { try { reserveQueueSpace(data.length); } catch (LoggerTooFarBehindException e) { @@ -246,7 +247,8 @@ public class IPCLoggerChannel implements ret = executor.submit(new Callable() { @Override public Void call() throws IOException { - getProxy().journal(createReqInfo(), firstTxnId, numTxns, data); + getProxy().journal(createReqInfo(), + segmentTxId, firstTxnId, numTxns, data); return null; } }); Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1373571&r1=1373570&r2=1373571&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java Wed Aug 15 18:58:51 2012 @@ -353,7 +353,7 @@ public class QuorumJournalManager implem "must recover segments before starting a new one"); QuorumCall q = loggers.startLogSegment(txId); loggers.waitForWriteQuorum(q, startSegmentTimeoutMs); - return new QuorumOutputStream(loggers); + return new QuorumOutputStream(loggers, txId); } @Override Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java?rev=1373571&r1=1373570&r2=1373571&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java Wed Aug 15 18:58:51 2012 @@ -31,11 +31,14 @@ import org.apache.hadoop.io.DataOutputBu class QuorumOutputStream extends EditLogOutputStream { private final AsyncLoggerSet loggers; private EditsDoubleBuffer buf; + private final long segmentTxId; - public QuorumOutputStream(AsyncLoggerSet loggers) throws IOException { + public QuorumOutputStream(AsyncLoggerSet loggers, + long txId) throws IOException { super(); this.buf = new EditsDoubleBuffer(256*1024); // TODO: conf this.loggers = loggers; + this.segmentTxId = txId; } @Override @@ -96,7 +99,8 @@ class QuorumOutputStream extends EditLog assert data.length == bufToSend.getLength(); QuorumCall qcall = loggers.sendEdits( - firstTxToFlush, numReadyTxns, data); + segmentTxId, firstTxToFlush, + numReadyTxns, data); loggers.waitForWriteQuorum(qcall, 20000); // TODO: configurable timeout } } Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java?rev=1373571&r1=1373570&r2=1373571&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java Wed Aug 15 18:58:51 2012 @@ -72,6 +72,7 @@ public interface QJournalProtocol { * to write edits to their local logs. */ public void journal(RequestInfo reqInfo, + long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException; Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java?rev=1373571&r1=1373570&r2=1373571&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java Wed Aug 15 18:58:51 2012 @@ -109,8 +109,8 @@ public class QJournalProtocolServerSideT JournalRequestProto req) throws ServiceException { try { impl.journal(convert(req.getReqInfo()), - req.getFirstTxnId(), req.getNumTxns(), req.getRecords() - .toByteArray()); + req.getSegmentTxnId(), req.getFirstTxnId(), + req.getNumTxns(), req.getRecords().toByteArray()); } catch (IOException e) { throw new ServiceException(e); } Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java?rev=1373571&r1=1373570&r2=1373571&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java Wed Aug 15 18:58:51 2012 @@ -124,10 +124,12 @@ public class QJournalProtocolTranslatorP } @Override - public void journal(RequestInfo reqInfo, long firstTxnId, int numTxns, + public void journal(RequestInfo reqInfo, + long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException { JournalRequestProto req = JournalRequestProto.newBuilder() .setReqInfo(convert(reqInfo)) + .setSegmentTxnId(segmentTxId) .setFirstTxnId(firstTxnId) .setNumTxns(numTxns) .setRecords(PBHelper.getByteString(records)) Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java?rev=1373571&r1=1373570&r2=1373571&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java Wed Aug 15 18:58:51 2012 @@ -196,9 +196,10 @@ class Journal implements Closeable { /** * Write a batch of edits to the journal. - * {@see QJournalProtocol#journal(RequestInfo, long, int, byte[])} + * {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])} */ - synchronized void journal(RequestInfo reqInfo, long firstTxnId, + synchronized void journal(RequestInfo reqInfo, + long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException { checkRequest(reqInfo); checkFormatted(); @@ -211,6 +212,21 @@ class Journal implements Closeable { // That way the node can catch back up and rejoin Preconditions.checkState(curSegment != null, "Can't write, no segment open"); + + if (curSegmentTxId != segmentTxId) { + // Sanity check: it is possible that the writer will fail IPCs + // on both the finalize() and then the start() of the next segment. + // This could cause us to continue writing to an old segment + // instead of rolling to a new one, which breaks one of the + // invariants in the design. If it happens, abort the segment + // and throw an exception. + curSegment.abort(); + curSegment = null; + throw new IllegalStateException( + "Writer out of sync: it thinks it is writing segment " + segmentTxId + + " but current segment is " + curSegmentTxId); + } + Preconditions.checkState(nextTxId == firstTxnId, "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId); Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java?rev=1373571&r1=1373570&r2=1373571&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java Wed Aug 15 18:58:51 2012 @@ -115,10 +115,11 @@ class JournalNodeRpcServer implements QJ } @Override - public void journal(RequestInfo reqInfo, long firstTxnId, + public void journal(RequestInfo reqInfo, + long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException { jn.getOrCreateJournal(reqInfo.getJournalId()) - .journal(reqInfo, firstTxnId, numTxns, records); + .journal(reqInfo, segmentTxId, firstTxnId, numTxns, records); } @Override Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto?rev=1373571&r1=1373570&r2=1373571&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto Wed Aug 15 18:58:51 2012 @@ -58,6 +58,7 @@ message JournalRequestProto { required uint64 firstTxnId = 2; required uint32 numTxns = 3; required bytes records = 4; + required uint64 segmentTxnId = 5; } message JournalResponseProto { Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java?rev=1373571&r1=1373570&r2=1373571&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java Wed Aug 15 18:58:51 2012 @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFal import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.Arrays; @@ -37,6 +38,9 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; + +import com.google.common.collect.Lists; public abstract class QJMTestUtil { public static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo( @@ -132,6 +136,23 @@ public abstract class QJMTestUtil { assertTrue("File " + fname + " should exist in a quorum of dirs", count >= cluster.getQuorumSize()); } - + public static long recoverAndReturnLastTxn(QuorumJournalManager qjm) + throws IOException { + qjm.recoverUnfinalizedSegments(); + long lastRecoveredTxn = 0; + + List streams = Lists.newArrayList(); + try { + qjm.selectInputStreams(streams, 0, false); + + for (EditLogInputStream elis : streams) { + assertTrue(elis.getFirstTxId() > lastRecoveredTxn); + lastRecoveredTxn = elis.getLastTxId(); + } + } finally { + IOUtils.cleanup(null, streams.toArray(new Closeable[0])); + } + return lastRecoveredTxn; + } } Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java?rev=1373571&r1=1373570&r2=1373571&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java Wed Aug 15 18:58:51 2012 @@ -78,9 +78,10 @@ public class TestIPCLoggerChannel { @Test public void testSimpleCall() throws Exception { - ch.sendEdits(1, 3, FAKE_DATA).get(); + ch.sendEdits(1, 1, 3, FAKE_DATA).get(); Mockito.verify(mockProxy).journal(Mockito.any(), - Mockito.eq(1L), Mockito.eq(3), Mockito.same(FAKE_DATA)); + Mockito.eq(1L), Mockito.eq(1L), + Mockito.eq(3), Mockito.same(FAKE_DATA)); } @@ -95,12 +96,13 @@ public class TestIPCLoggerChannel { DelayAnswer delayer = new DelayAnswer(LOG); Mockito.doAnswer(delayer).when(mockProxy).journal( Mockito.any(), - Mockito.eq(1L), Mockito.eq(1), Mockito.same(FAKE_DATA)); + Mockito.eq(1L), Mockito.eq(1L), + Mockito.eq(1), Mockito.same(FAKE_DATA)); // Queue up the maximum number of calls. int numToQueue = LIMIT_QUEUE_SIZE_BYTES / FAKE_DATA.length; for (int i = 1; i <= numToQueue; i++) { - ch.sendEdits((long)i, 1, FAKE_DATA); + ch.sendEdits(1L, (long)i, 1, FAKE_DATA); } // The accounting should show the correct total number queued. @@ -108,7 +110,7 @@ public class TestIPCLoggerChannel { // Trying to queue any more should fail. try { - ch.sendEdits(numToQueue + 1, 1, FAKE_DATA).get(1, TimeUnit.SECONDS); + ch.sendEdits(1L, numToQueue + 1, 1, FAKE_DATA).get(1, TimeUnit.SECONDS); fail("Did not fail to queue more calls after queue was full"); } catch (ExecutionException ee) { if (!(ee.getCause() instanceof LoggerTooFarBehindException)) { Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java?rev=1373571&r1=1373570&r2=1373571&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java Wed Aug 15 18:58:51 2012 @@ -23,6 +23,7 @@ import static org.apache.hadoop.hdfs.qjo import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeSegment; import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.writeTxns; import static org.apache.hadoop.hdfs.qjournal.QJMTestUtil.verifyEdits; +import static org.apache.hadoop.hdfs.qjournal.client.TestQuorumJournalManagerUnit.futureThrows; import java.io.Closeable; import java.io.File; @@ -414,11 +415,60 @@ public class TestQuorumJournalManager { private void failLoggerAtTxn(AsyncLogger spy, long txid) { TestQuorumJournalManagerUnit.futureThrows(new IOException("mock failure")) - .when(spy).sendEdits( + .when(spy).sendEdits(Mockito.anyLong(), Mockito.eq(txid), Mockito.eq(1), Mockito.any()); } /** + * Test the case where one of the loggers misses a finalizeLogSegment() + * call, and then misses the next startLogSegment() call before coming + * back to life. + * + * Previously, this caused it to keep on writing to the old log segment, + * such that one logger had eg edits_1-10 while the others had edits_1-5 and + * edits_6-10. This caused recovery to fail in certain cases. + */ + @Test + public void testMissFinalizeAndNextStart() throws Exception { + + // Logger 0: miss finalize(1-3) and start(4) + futureThrows(new IOException("injected")).when(spies.get(0)) + .finalizeLogSegment(Mockito.eq(1L), Mockito.eq(3L)); + futureThrows(new IOException("injected")).when(spies.get(0)) + .startLogSegment(Mockito.eq(4L)); + + // Logger 1: fail at txn id 4 + failLoggerAtTxn(spies.get(1), 4L); + + writeSegment(cluster, qjm, 1, 3, true); + EditLogOutputStream stm = qjm.startLogSegment(4); + try { + writeTxns(stm, 4, 1); + fail("Did not fail to write"); + } catch (QuorumException qe) { + // Should fail, because logger 1 had an injected fault and + // logger 0 should detect writer out of sync + GenericTestUtils.assertExceptionContains("Writer out of sync", + qe); + } finally { + stm.abort(); + qjm.close(); + } + + // State: + // Logger 0: 1-3 in-progress (since it missed finalize) + // Logger 1: 1-3 finalized + // Logger 2: 1-3 finalized, 4 in-progress with one txn + + // Shut down logger 2 so it doesn't participate in recovery + cluster.getJournalNode(2).stopAndJoin(0); + + qjm = createSpyingQJM(); + long recovered = QJMTestUtil.recoverAndReturnLastTxn(qjm); + assertEquals(3L, recovered); + } + + /** * edit lengths [3,4,5] * first recovery: * - sees [3,4,x] Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java?rev=1373571&r1=1373570&r2=1373571&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java Wed Aug 15 18:58:51 2012 @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.qjournal.client; import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; import java.io.IOException; @@ -150,21 +151,21 @@ public class TestQuorumJournalManagerUni // The flush should log txn 1-2 futureReturns(null).when(spyLoggers.get(0)).sendEdits( - eq(1L), eq(2), Mockito.any()); + anyLong(), eq(1L), eq(2), Mockito.any()); futureReturns(null).when(spyLoggers.get(1)).sendEdits( - eq(1L), eq(2), Mockito.any()); + anyLong(), eq(1L), eq(2), Mockito.any()); futureReturns(null).when(spyLoggers.get(2)).sendEdits( - eq(1L), eq(2), Mockito.any()); + anyLong(), eq(1L), eq(2), Mockito.any()); stm.flush(); // Another flush should now log txn #3 stm.setReadyToFlush(); futureReturns(null).when(spyLoggers.get(0)).sendEdits( - eq(3L), eq(1), Mockito.any()); + anyLong(), eq(3L), eq(1), Mockito.any()); futureReturns(null).when(spyLoggers.get(1)).sendEdits( - eq(3L), eq(1), Mockito.any()); + anyLong(), eq(3L), eq(1), Mockito.any()); futureReturns(null).when(spyLoggers.get(2)).sendEdits( - eq(3L), eq(1), Mockito.any()); + anyLong(), eq(3L), eq(1), Mockito.any()); stm.flush(); } @@ -176,14 +177,14 @@ public class TestQuorumJournalManagerUni // Make the first two logs respond immediately futureReturns(null).when(spyLoggers.get(0)).sendEdits( - eq(1L), eq(1), Mockito.any()); + anyLong(), eq(1L), eq(1), Mockito.any()); futureReturns(null).when(spyLoggers.get(1)).sendEdits( - eq(1L), eq(1), Mockito.any()); + anyLong(), eq(1L), eq(1), Mockito.any()); // And the third log not respond SettableFuture slowLog = SettableFuture.create(); Mockito.doReturn(slowLog).when(spyLoggers.get(2)).sendEdits( - eq(1L), eq(1), Mockito.any()); + anyLong(), eq(1L), eq(1), Mockito.any()); stm.flush(); } Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java?rev=1373571&r1=1373570&r2=1373571&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java Wed Aug 15 18:58:51 2012 @@ -97,7 +97,7 @@ public class TestJournal { } try { journal.journal(new RequestInfo(JID, 1L, 1L), - 100L, 0, new byte[0]); + 12345L, 100L, 0, new byte[0]); fail("Should have rejected call from prior epoch"); } catch (IOException ioe) { GenericTestUtils.assertExceptionContains( @@ -109,7 +109,7 @@ public class TestJournal { public void testRestartJournal() throws Exception { journal.newEpoch(FAKE_NSINFO, 1); journal.startLogSegment(new RequestInfo("j", 1, 1), 1); - journal.journal(new RequestInfo("j", 1, 2), 1, 2, + journal.journal(new RequestInfo("j", 1, 2), 1, 1, 2, QJMTestUtil.createTxnData(1, 2)); // Don't finalize. @@ -163,7 +163,7 @@ public class TestJournal { public void testFinalizeWhenEditsAreMissed() throws Exception { journal.newEpoch(FAKE_NSINFO, 1); journal.startLogSegment(makeRI(1), 1); - journal.journal(makeRI(2), 1, 3, + journal.journal(makeRI(2), 1, 1, 3, QJMTestUtil.createTxnData(1, 3)); // Try to finalize up to txn 6, even though we only wrote up to txn 3. @@ -220,7 +220,7 @@ public class TestJournal { // Start a segment at txid 1, and write a batch of 3 txns. journal.startLogSegment(makeRI(1), 1); - journal.journal(makeRI(2), 1, 3, + journal.journal(makeRI(2), 1, 1, 3, QJMTestUtil.createTxnData(1, 3)); GenericTestUtils.assertExists( @@ -229,7 +229,7 @@ public class TestJournal { // Try to start new segment at txid 6, this should abort old segment and // then succeed, allowing us to write txid 6-9. journal.startLogSegment(makeRI(3), 6); - journal.journal(makeRI(4), 6, 3, + journal.journal(makeRI(4), 6, 6, 3, QJMTestUtil.createTxnData(6, 3)); // The old segment should *not* be finalized. @@ -250,19 +250,19 @@ public class TestJournal { // Start a segment at txid 1, and write just 1 transaction. This // would normally be the START_LOG_SEGMENT transaction. journal.startLogSegment(makeRI(1), 1); - journal.journal(makeRI(2), 1, 1, + journal.journal(makeRI(2), 1, 1, 1, QJMTestUtil.createTxnData(1, 1)); // Try to start new segment at txid 1, this should succeed, because // we are allowed to re-start a segment if we only ever had the // START_LOG_SEGMENT transaction logged. journal.startLogSegment(makeRI(3), 1); - journal.journal(makeRI(4), 1, 1, + journal.journal(makeRI(4), 1, 1, 1, QJMTestUtil.createTxnData(1, 1)); // This time through, write more transactions afterwards, simulating // real user transactions. - journal.journal(makeRI(5), 2, 3, + journal.journal(makeRI(5), 1, 2, 3, QJMTestUtil.createTxnData(2, 3)); try { Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java?rev=1373571&r1=1373570&r2=1373571&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java Wed Aug 15 18:58:51 2012 @@ -91,7 +91,7 @@ public class TestJournalNode { ch.newEpoch(1).get(); ch.setEpoch(1); ch.startLogSegment(1).get(); - ch.sendEdits(1, 1, "hello".getBytes(Charsets.UTF_8)).get(); + ch.sendEdits(1L, 1, 1, "hello".getBytes(Charsets.UTF_8)).get(); } @@ -100,7 +100,7 @@ public class TestJournalNode { ch.newEpoch(1).get(); ch.setEpoch(1); ch.startLogSegment(1).get(); - ch.sendEdits(1, 2, QJMTestUtil.createTxnData(1, 2)).get(); + ch.sendEdits(1L, 1, 2, QJMTestUtil.createTxnData(1, 2)).get(); // Switch to a new epoch without closing earlier segment NewEpochResponseProto response = ch.newEpoch(2).get(); @@ -148,7 +148,7 @@ public class TestJournalNode { ch.newEpoch(1).get(); ch.setEpoch(1); ch.startLogSegment(1).get(); - ch.sendEdits(1, 3, EDITS_DATA).get(); + ch.sendEdits(1L, 1, 3, EDITS_DATA).get(); ch.finalizeLogSegment(1, 3).get(); // Attempt to retrieve via HTTP, ensure we get the data back @@ -199,7 +199,7 @@ public class TestJournalNode { // Make a log segment, and prepare again -- this time should see the // segment existing. ch.startLogSegment(1L).get(); - ch.sendEdits(1L, 1, QJMTestUtil.createTxnData(1, 1)).get(); + ch.sendEdits(1L, 1L, 1, QJMTestUtil.createTxnData(1, 1)).get(); prep = ch.prepareRecovery(1L).get(); System.err.println("Prep: " + prep);