Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B6295DA75 for ; Tue, 11 Sep 2012 06:32:33 +0000 (UTC) Received: (qmail 87016 invoked by uid 500); 11 Sep 2012 06:32:33 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 86885 invoked by uid 500); 11 Sep 2012 06:32:29 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 86850 invoked by uid 99); 11 Sep 2012 06:32:28 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Sep 2012 06:32:28 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Sep 2012 06:32:26 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 98ED823888EA; Tue, 11 Sep 2012 06:31:43 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1383251 - in /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/qjournal/client/ src/test/java/org/apache/hadoop/hdfs/qjournal/client/ Date: Tue, 11 Sep 2012 06:31:43 -0000 To: hdfs-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120911063143.98ED823888EA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: todd Date: Tue Sep 11 06:31:42 2012 New Revision: 1383251 URL: http://svn.apache.org/viewvc?rev=1383251&view=rev Log: HDFS-3906. QJM: quorum timeout on failover with large log segment. Contributed by Todd Lipcon. Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1383251&r1=1383250&r2=1383251&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt Tue Sep 11 06:31:42 2012 @@ -66,3 +66,5 @@ HDFS-3899. QJM: Add client-side metrics HDFS-3914. QJM: acceptRecovery should abort current segment (todd) HDFS-3915. QJM: Failover fails with auth error in secure cluster (todd) + +HDFS-3906. QJM: quorum timeout on failover with large log segment (todd) Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1383251&r1=1383250&r2=1383251&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Sep 11 06:31:42 2012 @@ -413,10 +413,14 @@ public class DFSConfigKeys extends Commo public static final String DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_KEY = "dfs.qjournal.accept-recovery.timeout.ms"; public static final String DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_KEY = "dfs.qjournal.finalize-segment.timeout.ms"; public static final String DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY = "dfs.qjournal.select-input-streams.timeout.ms"; + public static final String DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY = "dfs.qjournal.get-journal-state.timeout.ms"; + public static final String DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY = "dfs.qjournal.new-epoch.timeout.ms"; public static final int DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT = 20000; - public static final int DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 20000; - public static final int DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 60000; - public static final int DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_DEFAULT = 20000; + public static final int DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 120000; + public static final int DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 120000; + public static final int DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_DEFAULT = 120000; public static final int DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT = 20000; + public static final int DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT = 120000; + public static final int DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT = 120000; } Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java?rev=1383251&r1=1383250&r2=1383251&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java Tue Sep 11 06:31:42 2012 @@ -52,8 +52,6 @@ import com.google.common.util.concurrent class AsyncLoggerSet { static final Log LOG = LogFactory.getLog(AsyncLoggerSet.class); - private static final int NEWEPOCH_TIMEOUT_MS = 10000; - private final List loggers; private static final long INVALID_EPOCH = -1; @@ -63,44 +61,15 @@ class AsyncLoggerSet { this.loggers = ImmutableList.copyOf(loggers); } - /** - * Fence any previous writers, and obtain a unique epoch number - * for write-access to the journal nodes. - * - * @param nsInfo the expected namespace information. If the remote - * node does not match with this namespace, the request will be rejected. - * @return the new, unique epoch number - * @throws IOException - */ - Map createNewUniqueEpoch( - NamespaceInfo nsInfo) throws IOException { - Preconditions.checkState(myEpoch == -1, - "epoch already created: epoch=" + myEpoch); - - Map lastPromises = - waitForWriteQuorum(getJournalState(), NEWEPOCH_TIMEOUT_MS); - - long maxPromised = Long.MIN_VALUE; - for (GetJournalStateResponseProto resp : lastPromises.values()) { - maxPromised = Math.max(maxPromised, resp.getLastPromisedEpoch()); - } - assert maxPromised >= 0; - - long myEpoch = maxPromised + 1; - Map resps = - waitForWriteQuorum(newEpoch(nsInfo, myEpoch), NEWEPOCH_TIMEOUT_MS); - this.myEpoch = myEpoch; - setEpoch(myEpoch); - return resps; - } - - private void setEpoch(long e) { + void setEpoch(long e) { + Preconditions.checkState(!isEpochEstablished(), + "Epoch already established: epoch=%s", myEpoch); + myEpoch = e; for (AsyncLogger l : loggers) { l.setEpoch(e); } } - /** * Set the highest successfully committed txid seen by the writer. * This should be called after a successful write to a quorum, and is used @@ -113,6 +82,13 @@ class AsyncLoggerSet { } /** + * @return true if an epoch has been established. + */ + boolean isEpochEstablished() { + return myEpoch != INVALID_EPOCH; + } + + /** * @return the epoch number for this writer. This may only be called after * a successful call to {@link #createNewUniqueEpoch(NamespaceInfo)}. */ @@ -143,19 +119,20 @@ class AsyncLoggerSet { * can't be achieved, throws a QuorumException. * @param q the quorum call * @param timeoutMs the number of millis to wait + * @param operationName textual description of the operation, for logging * @return a map of successful results * @throws QuorumException if a quorum doesn't respond with success * @throws IOException if the thread is interrupted or times out */ Map waitForWriteQuorum(QuorumCall q, - int timeoutMs) throws IOException { + int timeoutMs, String operationName) throws IOException { int majority = getMajoritySize(); try { q.waitFor( loggers.size(), // either all respond majority, // or we get a majority successes majority, // or we get a majority failures, - timeoutMs); + timeoutMs, operationName); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Interrupted waiting " + timeoutMs + "ms for a " + @@ -227,7 +204,7 @@ class AsyncLoggerSet { // in a QuorumCall. /////////////////////////////////////////////////////////////////////////// - private QuorumCall getJournalState() { + public QuorumCall getJournalState() { Map> calls = Maps.newHashMap(); for (AsyncLogger logger : loggers) { @@ -266,7 +243,7 @@ class AsyncLoggerSet { return QuorumCall.create(calls); } - private QuorumCall newEpoch( + public QuorumCall newEpoch( NamespaceInfo nsInfo, long epoch) { Map> calls = Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java?rev=1383251&r1=1383250&r2=1383251&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java Tue Sep 11 06:31:42 2012 @@ -39,6 +39,23 @@ import com.google.protobuf.TextFormat; class QuorumCall { private final Map successes = Maps.newHashMap(); private final Map exceptions = Maps.newHashMap(); + + /** + * Interval, in milliseconds, at which a log message will be made + * while waiting for a quorum call. + */ + private static final int WAIT_PROGRESS_INTERVAL_MILLIS = 1000; + + /** + * Start logging messages at INFO level periodically after waiting for + * this fraction of the configured timeout for any call. + */ + private static final float WAIT_PROGRESS_INFO_THRESHOLD = 0.3f; + /** + * Start logging messages at WARN level after waiting for this + * fraction of the configured timeout for any call. + */ + private static final float WAIT_PROGRESS_WARN_THRESHOLD = 0.7f; static QuorumCall create( Map> calls) { @@ -85,17 +102,35 @@ class QuorumCall { */ public synchronized void waitFor( int minResponses, int minSuccesses, int maxExceptions, - int millis) + int millis, String operationName) throws InterruptedException, TimeoutException { - long et = Time.monotonicNow() + millis; + long st = Time.monotonicNow(); + long nextLogTime = st + (long)(millis * WAIT_PROGRESS_INFO_THRESHOLD); + long et = st + millis; while (true) { if (minResponses > 0 && countResponses() >= minResponses) return; if (minSuccesses > 0 && countSuccesses() >= minSuccesses) return; if (maxExceptions >= 0 && countExceptions() > maxExceptions) return; - long rem = et - Time.monotonicNow(); + long now = Time.monotonicNow(); + + if (now > nextLogTime) { + long waited = now - st; + String msg = String.format( + "Waited %s ms (timeout=%s ms) for a response for %s", + waited, millis, operationName); + if (waited > millis * WAIT_PROGRESS_WARN_THRESHOLD) { + QuorumJournalManager.LOG.warn(msg); + } else { + QuorumJournalManager.LOG.info(msg); + } + nextLogTime = now + WAIT_PROGRESS_INTERVAL_MILLIS; + } + long rem = et - now; if (rem <= 0) { throw new TimeoutException(); } + rem = Math.min(rem, nextLogTime - now); + rem = Math.max(rem, 1); wait(rem); } } Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1383251&r1=1383250&r2=1383251&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java Tue Sep 11 06:31:42 2012 @@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; @@ -52,6 +53,7 @@ import com.google.common.annotations.Vis import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.protobuf.TextFormat; /** * A JournalManager that writes to a set of remote JournalNodes, @@ -67,6 +69,8 @@ public class QuorumJournalManager implem private final int acceptRecoveryTimeoutMs; private final int finalizeSegmentTimeoutMs; private final int selectInputStreamsTimeoutMs; + private final int getJournalStateTimeoutMs; + private final int newEpochTimeoutMs; // Since these don't occur during normal operation, we can // use rather lengthy timeouts, and don't need to make them @@ -112,6 +116,13 @@ public class QuorumJournalManager implem this.selectInputStreamsTimeoutMs = conf.getInt( DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY, DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT); + this.getJournalStateTimeoutMs = conf.getInt( + DFSConfigKeys.DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY, + DFSConfigKeys.DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT); + this.newEpochTimeoutMs = conf.getInt( + DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY, + DFSConfigKeys.DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT); + } @@ -138,11 +149,43 @@ public class QuorumJournalManager implem "bad journal id: " + jid); } + + /** + * Fence any previous writers, and obtain a unique epoch number + * for write-access to the journal nodes. + * + * @return the new, unique epoch number + */ + Map createNewUniqueEpoch() + throws IOException { + Preconditions.checkState(!loggers.isEpochEstablished(), + "epoch already created"); + + Map lastPromises = + loggers.waitForWriteQuorum(loggers.getJournalState(), + getJournalStateTimeoutMs, "getJournalState()"); + + long maxPromised = Long.MIN_VALUE; + for (GetJournalStateResponseProto resp : lastPromises.values()) { + maxPromised = Math.max(maxPromised, resp.getLastPromisedEpoch()); + } + assert maxPromised >= 0; + + long myEpoch = maxPromised + 1; + Map resps = + loggers.waitForWriteQuorum(loggers.newEpoch(nsInfo, myEpoch), + newEpochTimeoutMs, "newEpoch(" + myEpoch + ")"); + + loggers.setEpoch(myEpoch); + return resps; + } + @Override public void format(NamespaceInfo nsInfo) throws IOException { QuorumCall call = loggers.format(nsInfo); try { - call.waitFor(loggers.size(), loggers.size(), 0, FORMAT_TIMEOUT_MS); + call.waitFor(loggers.size(), loggers.size(), 0, FORMAT_TIMEOUT_MS, + "format"); } catch (InterruptedException e) { throw new IOException("Interrupted waiting for format() response"); } catch (TimeoutException e) { @@ -160,7 +203,7 @@ public class QuorumJournalManager implem loggers.isFormatted(); try { - call.waitFor(loggers.size(), 0, 0, HASDATA_TIMEOUT_MS); + call.waitFor(loggers.size(), 0, 0, HASDATA_TIMEOUT_MS, "hasSomeData"); } catch (InterruptedException e) { throw new IOException("Interrupted while determining if JNs have data"); } catch (TimeoutException e) { @@ -206,7 +249,8 @@ public class QuorumJournalManager implem QuorumCall prepare = loggers.prepareRecovery(segmentTxId); Map prepareResponses= - loggers.waitForWriteQuorum(prepare, prepareRecoveryTimeoutMs); + loggers.waitForWriteQuorum(prepare, prepareRecoveryTimeoutMs, + "prepareRecovery(" + segmentTxId + ")"); LOG.info("Recovery prepare phase complete. Responses:\n" + QuorumCall.mapToString(prepareResponses)); @@ -283,7 +327,8 @@ public class QuorumJournalManager implem URL syncFromUrl = bestLogger.buildURLToFetchLogs(segmentTxId); QuorumCall accept = loggers.acceptRecovery(logToSync, syncFromUrl); - loggers.waitForWriteQuorum(accept, acceptRecoveryTimeoutMs); + loggers.waitForWriteQuorum(accept, acceptRecoveryTimeoutMs, + "acceptRecovery(" + TextFormat.shortDebugString(logToSync) + ")"); // TODO: // we should only try to finalize loggers who successfully synced above @@ -292,7 +337,10 @@ public class QuorumJournalManager implem QuorumCall finalize = loggers.finalizeLogSegment(logToSync.getStartTxId(), logToSync.getEndTxId()); - loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs); + loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs, + String.format("finalizeLogSegment(%s-%s)", + logToSync.getStartTxId(), + logToSync.getEndTxId())); } static List createLoggers(Configuration conf, @@ -336,7 +384,8 @@ public class QuorumJournalManager implem Preconditions.checkState(isActiveWriter, "must recover segments before starting a new one"); QuorumCall q = loggers.startLogSegment(txId); - loggers.waitForWriteQuorum(q, startSegmentTimeoutMs); + loggers.waitForWriteQuorum(q, startSegmentTimeoutMs, + "startLogSegment(" + txId + ")"); return new QuorumOutputStream(loggers, txId); } @@ -345,7 +394,8 @@ public class QuorumJournalManager implem throws IOException { QuorumCall q = loggers.finalizeLogSegment( firstTxId, lastTxId); - loggers.waitForWriteQuorum(q, finalizeSegmentTimeoutMs); + loggers.waitForWriteQuorum(q, finalizeSegmentTimeoutMs, + String.format("finalizeLogSegment(%s-%s)", firstTxId, lastTxId)); } @Override @@ -366,8 +416,7 @@ public class QuorumJournalManager implem public void recoverUnfinalizedSegments() throws IOException { Preconditions.checkState(!isActiveWriter, "already active writer"); - Map resps = - loggers.createNewUniqueEpoch(nsInfo); + Map resps = createNewUniqueEpoch(); LOG.info("newEpoch(" + loggers.getEpoch() + ") responses:\n" + QuorumCall.mapToString(resps)); @@ -399,7 +448,8 @@ public class QuorumJournalManager implem QuorumCall q = loggers.getEditLogManifest(fromTxnId); Map resps = - loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs); + loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs, + "selectInputStreams"); LOG.debug("selectInputStream manifests:\n" + Joiner.on("\n").withKeyValueSeparator(": ").join(resps)); Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java?rev=1383251&r1=1383250&r2=1383251&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java Tue Sep 11 06:31:42 2012 @@ -101,7 +101,7 @@ class QuorumOutputStream extends EditLog QuorumCall qcall = loggers.sendEdits( segmentTxId, firstTxToFlush, numReadyTxns, data); - loggers.waitForWriteQuorum(qcall, 20000); // TODO: configurable timeout + loggers.waitForWriteQuorum(qcall, 20000, "sendEdits"); // TODO: configurable timeout // Since we successfully wrote this batch, let the loggers know. Any future // RPCs will thus let the loggers know of the most recent transaction, even Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java?rev=1383251&r1=1383250&r2=1383251&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java Tue Sep 11 06:31:42 2012 @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.qjournal. import static org.junit.Assert.*; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.URI; import java.util.List; import java.util.Random; @@ -56,35 +57,43 @@ public class TestEpochsAreUnique { URI uri = cluster.getQuorumJournalURI(JID); QuorumJournalManager qjm = new QuorumJournalManager( conf, uri, FAKE_NSINFO); - qjm.format(FAKE_NSINFO); + try { + qjm.format(FAKE_NSINFO); + } finally { + qjm.close(); + } try { // With no failures or contention, epochs should increase one-by-one for (int i = 0; i < 5; i++) { - AsyncLoggerSet als = new AsyncLoggerSet( - QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO, - IPCLoggerChannel.FACTORY)); - als.createNewUniqueEpoch(FAKE_NSINFO); - assertEquals(i + 1, als.getEpoch()); + qjm = new QuorumJournalManager( + conf, uri, FAKE_NSINFO); + try { + qjm.createNewUniqueEpoch(); + assertEquals(i + 1, qjm.getLoggerSetForTests().getEpoch()); + } finally { + qjm.close(); + } } long prevEpoch = 5; // With some failures injected, it should still always increase, perhaps // skipping some for (int i = 0; i < 20; i++) { - AsyncLoggerSet als = new AsyncLoggerSet( - makeFaulty(QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO, - IPCLoggerChannel.FACTORY))); long newEpoch = -1; while (true) { + qjm = new QuorumJournalManager( + conf, uri, FAKE_NSINFO, new FaultyLoggerFactory()); try { - als.createNewUniqueEpoch(FAKE_NSINFO); - newEpoch = als.getEpoch(); + qjm.createNewUniqueEpoch(); + newEpoch = qjm.getLoggerSetForTests().getEpoch(); break; } catch (IOException ioe) { // It's OK to fail to create an epoch, since we randomly inject // faults. It's possible we'll inject faults in too many of the // underlying nodes, and a failure is expected in that case + } finally { + qjm.close(); } } LOG.info("Created epoch " + newEpoch); @@ -97,20 +106,23 @@ public class TestEpochsAreUnique { } } - - private List makeFaulty(List loggers) { - List ret = Lists.newArrayList(); - for (AsyncLogger l : loggers) { - AsyncLogger spy = Mockito.spy(l); + private class FaultyLoggerFactory implements AsyncLogger.Factory { + @Override + public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, + String journalId, InetSocketAddress addr) { + AsyncLogger ch = IPCLoggerChannel.FACTORY.createLogger( + conf, nsInfo, journalId, addr); + AsyncLogger spy = Mockito.spy(ch); Mockito.doAnswer(new SometimesFaulty(0.10f)) .when(spy).getJournalState(); Mockito.doAnswer(new SometimesFaulty(0.40f)) .when(spy).newEpoch(Mockito.anyLong()); - ret.add(spy); + + return spy; } - return ret; + } - + private class SometimesFaulty implements Answer> { private float faultProbability; Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java?rev=1383251&r1=1383250&r2=1383251&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java Tue Sep 11 06:31:42 2012 @@ -42,8 +42,8 @@ public class TestQuorumCall { assertEquals(0, q.countResponses()); futures.get("f1").set("first future"); - q.waitFor(1, 0, 0, 100000); // wait for 1 response - q.waitFor(0, 1, 0, 100000); // wait for 1 success + q.waitFor(1, 0, 0, 100000, "test"); // wait for 1 response + q.waitFor(0, 1, 0, 100000, "test"); // wait for 1 success assertEquals(1, q.countResponses()); @@ -51,8 +51,8 @@ public class TestQuorumCall { assertEquals(2, q.countResponses()); futures.get("f3").set("second future"); - q.waitFor(3, 0, 100, 100000); // wait for 3 responses - q.waitFor(0, 2, 100, 100000); // 2 successes + q.waitFor(3, 0, 100, 100000, "test"); // wait for 3 responses + q.waitFor(0, 2, 100, 100000, "test"); // 2 successes assertEquals(3, q.countResponses()); assertEquals("f1=first future,f3=second future", @@ -60,7 +60,7 @@ public class TestQuorumCall { new TreeMap(q.getResults()))); try { - q.waitFor(0, 4, 100, 10); + q.waitFor(0, 4, 100, 10, "test"); fail("Didn't time out waiting for more responses than came back"); } catch (TimeoutException te) { // expected