Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F1B47D2DF for ; Mon, 10 Sep 2012 22:31:39 +0000 (UTC) Received: (qmail 71662 invoked by uid 500); 10 Sep 2012 22:31:39 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 71593 invoked by uid 500); 10 Sep 2012 22:31:39 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 71581 invoked by uid 99); 10 Sep 2012 22:31:39 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Sep 2012 22:31:39 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Sep 2012 22:31:36 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id E247B2388980; Mon, 10 Sep 2012 22:30:53 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1383137 - in /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/qjournal/client/ src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/ src/main/java/org/apache/hadoop/hdfs/qjournal/... Date: Mon, 10 Sep 2012 22:30:53 -0000 To: hdfs-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120910223053.E247B2388980@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: todd Date: Mon Sep 10 22:30:52 2012 New Revision: 1383137 URL: http://svn.apache.org/viewvc?rev=1383137&view=rev Log: HDFS-3901. QJM: send 'heartbeat' messages to JNs even when they are out-of-sync. Contributed by Todd Lipcon. Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1383137&r1=1383136&r2=1383137&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt Mon Sep 10 22:30:52 2012 @@ -58,3 +58,5 @@ HDFS-3898. QJM: enable TCP_NODELAY for I HDFS-3885. QJM: optimize log sync when JN is lagging behind (todd) HDFS-3900. QJM: avoid validating log segments on log rolls (todd) + +HDFS-3901. QJM: send 'heartbeat' messages to JNs even when they are out-of-sync (todd) Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java?rev=1383137&r1=1383136&r2=1383137&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java Mon Sep 10 22:30:52 2012 @@ -26,6 +26,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -52,6 +53,7 @@ import org.apache.hadoop.security.Securi import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -92,6 +94,19 @@ public class IPCLoggerChannel implements * The highest txid that has been successfully logged on the remote JN. */ private long highestAckedTxId = 0; + + /** + * Nanotime of the last time we successfully journaled some edits + * to the remote node. + */ + private long lastAckNanos = 0; + + /** + * Nanotime of the last time that committedTxId was update. Used + * to calculate the lag in terms of time, rather than just a number + * of txns. + */ + private long lastCommitNanos = 0; /** * The maximum number of bytes that can be pending in the queue. @@ -109,6 +124,13 @@ public class IPCLoggerChannel implements */ private boolean outOfSync = false; + /** + * Stopwatch which starts counting on each heartbeat that is sent + */ + private Stopwatch lastHeartbeatStopwatch = new Stopwatch(); + + private static final long HEARTBEAT_INTERVAL_MILLIS = 1000; + static final Factory FACTORY = new AsyncLogger.Factory() { @Override public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, @@ -145,6 +167,7 @@ public class IPCLoggerChannel implements "Trying to move committed txid backwards in client " + "old: %s new: %s", committedTxId, txid); this.committedTxId = txid; + this.lastCommitNanos = System.nanoTime(); } @Override @@ -295,6 +318,11 @@ public class IPCLoggerChannel implements } catch (LoggerTooFarBehindException e) { return Futures.immediateFailedFuture(e); } + + // When this batch is acked, we use its submission time in order + // to calculate how far we are lagging. + final long submitNanos = System.nanoTime(); + ListenableFuture ret = null; try { ret = executor.submit(new Callable() { @@ -318,6 +346,7 @@ public class IPCLoggerChannel implements } synchronized (IPCLoggerChannel.this) { highestAckedTxId = firstTxnId + numTxns - 1; + lastAckNanos = submitNanos; } return null; } @@ -347,15 +376,40 @@ public class IPCLoggerChannel implements return ret; } - private synchronized void throwIfOutOfSync() throws JournalOutOfSyncException { - if (outOfSync) { - // TODO: send a "heartbeat" here so that the remote node knows the newest - // committed txid, for metrics purposes + private void throwIfOutOfSync() + throws JournalOutOfSyncException, IOException { + if (isOutOfSync()) { + // Even if we're out of sync, it's useful to send an RPC + // to the remote node in order to update its lag metrics, etc. + heartbeatIfNecessary(); throw new JournalOutOfSyncException( "Journal disabled until next roll"); } } + /** + * When we've entered an out-of-sync state, it's still useful to periodically + * send an empty RPC to the server, such that it has the up to date + * committedTxId. This acts as a sanity check during recovery, and also allows + * that node's metrics to be up-to-date about its lag. + * + * In the future, this method may also be used in order to check that the + * current node is still the current writer, even if no edits are being + * written. + */ + private void heartbeatIfNecessary() throws IOException { + if (lastHeartbeatStopwatch.elapsedMillis() > HEARTBEAT_INTERVAL_MILLIS || + !lastHeartbeatStopwatch.isRunning()) { + try { + getProxy().heartbeat(createReqInfo()); + } finally { + // Don't send heartbeats more often than the configured interval, + // even if they fail. + lastHeartbeatStopwatch.reset().start(); + } + } + } + private synchronized void reserveQueueSpace(int size) throws LoggerTooFarBehindException { Preconditions.checkArgument(size >= 0); @@ -479,13 +533,27 @@ public class IPCLoggerChannel implements @Override public synchronized void appendHtmlReport(StringBuilder sb) { sb.append("Written txid ").append(highestAckedTxId); - long behind = committedTxId - highestAckedTxId; - assert behind >= 0; + long behind = getLagTxns(); if (behind > 0) { - sb.append(" (" + behind + " behind)"); + if (lastAckNanos != 0) { + long lagMillis = getLagTimeMillis(); + sb.append(" (" + behind + " txns/" + lagMillis + "ms behind)"); + } else { + sb.append(" (never written"); + } } if (outOfSync) { - sb.append(" (will re-join on next segment)"); + sb.append(" (will try to re-sync on next segment)"); } } + + private long getLagTxns() { + return Math.max(committedTxId - highestAckedTxId, 0); + } + + private long getLagTimeMillis() { + return TimeUnit.MILLISECONDS.convert( + Math.max(lastCommitNanos - lastAckNanos, 0), + TimeUnit.NANOSECONDS); + } } Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java?rev=1383137&r1=1383136&r2=1383137&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java Mon Sep 10 22:30:52 2012 @@ -77,6 +77,15 @@ public interface QJournalProtocol { int numTxns, byte[] records) throws IOException; + + /** + * Heartbeat. + * This is a no-op on the server, except that it verifies that the + * caller is in fact still the active writer, and provides up-to-date + * information on the most recently committed txid. + */ + public void heartbeat(RequestInfo reqInfo) throws IOException; + /** * Start writing to a new log segment on the JournalNode. * Before calling this, one should finalize the previous segment Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java?rev=1383137&r1=1383136&r2=1383137&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java Mon Sep 10 22:30:52 2012 @@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.qjournal.p import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalIdProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalResponseProto; @@ -118,6 +120,18 @@ public class QJournalProtocolServerSideT return JournalResponseProto.newBuilder().build(); } + /** @see JournalProtocol#heartbeat */ + @Override + public HeartbeatResponseProto heartbeat(RpcController controller, + HeartbeatRequestProto req) throws ServiceException { + try { + impl.heartbeat(convert(req.getReqInfo())); + } catch (IOException e) { + throw new ServiceException(e); + } + return HeartbeatResponseProto.getDefaultInstance(); + } + /** @see JournalProtocol#startLogSegment */ @Override public StartLogSegmentResponseProto startLogSegment(RpcController controller, Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java?rev=1383137&r1=1383136&r2=1383137&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java Mon Sep 10 22:30:52 2012 @@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.qjournal.p import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.HeartbeatRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalIdProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.JournalRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochRequestProto; @@ -141,6 +142,17 @@ public class QJournalProtocolTranslatorP throw ProtobufHelper.getRemoteException(e); } } + + @Override + public void heartbeat(RequestInfo reqInfo) throws IOException { + try { + rpcProxy.heartbeat(NULL_CONTROLLER, HeartbeatRequestProto.newBuilder() + .setReqInfo(convert(reqInfo)) + .build()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } private QJournalProtocolProtos.RequestInfoProto convert( RequestInfo reqInfo) { Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java?rev=1383137&r1=1383136&r2=1383137&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java Mon Sep 10 22:30:52 2012 @@ -75,6 +75,7 @@ class Journal implements Closeable { private EditLogOutputStream curSegment; private long curSegmentTxId = HdfsConstants.INVALID_TXID; private long nextTxId = HdfsConstants.INVALID_TXID; + private long highestWrittenTxId = 0; private final String journalId; @@ -123,6 +124,11 @@ class Journal implements Closeable { this.fjm = storage.getJournalManager(); this.metrics = JournalMetrics.create(this); + + EditLogFile latest = scanStorageForLatestEdits(); + if (latest != null) { + highestWrittenTxId = latest.getLastTxId(); + } } /** @@ -224,6 +230,19 @@ class Journal implements Closeable { return committedTxnId.get(); } + synchronized long getCurrentLagTxns() throws IOException { + long committed = committedTxnId.get(); + if (committed == 0) { + return 0; + } + + return Math.max(committed - highestWrittenTxId, 0L); + } + + synchronized long getHighestWrittenTxId() { + return highestWrittenTxId; + } + @VisibleForTesting JournalMetrics getMetricsForTests() { return metrics; @@ -329,19 +348,20 @@ class Journal implements Closeable { // This batch of edits has already been committed on a quorum of other // nodes. So, we are in "catch up" mode. This gets its own metric. metrics.batchesWrittenWhileLagging.incr(1); - metrics.currentLagTxns.set(committedTxnId.get() - lastTxnId); - } else { - metrics.currentLagTxns.set(0L); } metrics.batchesWritten.incr(1); metrics.bytesWritten.incr(records.length); metrics.txnsWritten.incr(numTxns); - metrics.lastWrittenTxId.set(lastTxnId); - nextTxId += numTxns; + highestWrittenTxId = lastTxnId; + nextTxId = lastTxnId + 1; } + public void heartbeat(RequestInfo reqInfo) throws IOException { + checkRequest(reqInfo); + } + /** * Ensure that the given request is coming from the correct writer and in-order. * @param reqInfo the request info @@ -690,6 +710,10 @@ class Journal implements Closeable { if (currentSegment == null) { LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) + ": no current segment in place"); + + // Update the highest txid for lag metrics + highestWrittenTxId = Math.max(segment.getEndTxId(), + highestWrittenTxId); } else { LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) + ": old segment " + TextFormat.shortDebugString(currentSegment) + @@ -708,8 +732,15 @@ class Journal implements Closeable { ": would discard already-committed txn " + committedTxnId.get()); } + + // If we're shortening the log, update our highest txid + // used for lag metrics. + if (txnRange(currentSegment).contains(highestWrittenTxId)) { + highestWrittenTxId = segment.getEndTxId(); + } } syncLog(reqInfo, segment, fromUrl); + } else { LOG.info("Skipping download of log " + TextFormat.shortDebugString(segment) + Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java?rev=1383137&r1=1383136&r2=1383137&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java Mon Sep 10 22:30:52 2012 @@ -51,12 +51,6 @@ class JournalMetrics { MutableQuantiles[] syncsQuantiles; - @Metric("Transaction lag behind the most recent commit") - MutableGaugeLong currentLagTxns; - - @Metric("Last written txid") - MutableGaugeLong lastWrittenTxId; - private final Journal journal; JournalMetrics(Journal journal) { @@ -99,6 +93,20 @@ class JournalMetrics { } } + @Metric("The highest txid stored on this JN") + public long getLastWrittenTxId() { + return journal.getHighestWrittenTxId(); + } + + @Metric("Number of transactions that this JN is lagging") + public long getCurrentLagTxns() { + try { + return journal.getCurrentLagTxns(); + } catch (IOException e) { + return -1L; + } + } + void addSync(long us) { for (MutableQuantiles q : syncsQuantiles) { q.add(us); Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java?rev=1383137&r1=1383136&r2=1383137&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java Mon Sep 10 22:30:52 2012 @@ -137,6 +137,12 @@ class JournalNodeRpcServer implements QJ jn.getOrCreateJournal(reqInfo.getJournalId()) .journal(reqInfo, segmentTxId, firstTxnId, numTxns, records); } + + @Override + public void heartbeat(RequestInfo reqInfo) throws IOException { + jn.getOrCreateJournal(reqInfo.getJournalId()) + .heartbeat(reqInfo); + } @Override public void startLogSegment(RequestInfo reqInfo, long txid) Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto?rev=1383137&r1=1383136&r2=1383137&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto Mon Sep 10 22:30:52 2012 @@ -72,6 +72,17 @@ message JournalResponseProto { } /** + * heartbeat() + */ + +message HeartbeatRequestProto { + required RequestInfoProto reqInfo = 1; +} + +message HeartbeatResponseProto { // void response +} + +/** * startLogSegment() */ message StartLogSegmentRequestProto { @@ -207,6 +218,8 @@ service QJournalProtocolService { rpc journal(JournalRequestProto) returns (JournalResponseProto); + rpc heartbeat(HeartbeatRequestProto) returns (HeartbeatResponseProto); + rpc startLogSegment(StartLogSegmentRequestProto) returns (StartLogSegmentResponseProto); Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java?rev=1383137&r1=1383136&r2=1383137&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java Mon Sep 10 22:30:52 2012 @@ -22,8 +22,10 @@ import static org.junit.Assert.*; import java.io.File; import java.io.IOException; import java.net.URL; +import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; @@ -193,6 +195,9 @@ public class TestNNWithQJM { MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image"); conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, mjc.getQuorumJournalURI("myjournal").toString()); + // Speed up the test + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(0) @@ -217,7 +222,18 @@ public class TestNNWithQJM { contents = DFSTestUtil.urlGet(url); System.out.println(contents); - assertTrue(contents.contains("(1 behind)")); + assertTrue(Pattern.compile("1 txns/\\d+ms behind").matcher(contents) + .find()); + + // Restart NN while JN0 is still down. + cluster.restartNameNode(); + + contents = DFSTestUtil.urlGet(url); + System.out.println(contents); + assertTrue(Pattern.compile("never written").matcher(contents) + .find()); + + } finally { cluster.shutdown(); } Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java?rev=1383137&r1=1383136&r2=1383137&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java Mon Sep 10 22:30:52 2012 @@ -163,11 +163,14 @@ public class TestIPCLoggerChannel { ee.getCause()); } - // It should have failed without even sending an RPC, since it was not sync. + // It should have failed without even sending the edits, since it was not sync. Mockito.verify(mockProxy, Mockito.never()).journal( Mockito.any(), Mockito.eq(1L), Mockito.eq(2L), Mockito.eq(1), Mockito.same(FAKE_DATA)); + // It should have sent a heartbeat instead. + Mockito.verify(mockProxy).heartbeat( + Mockito.any()); // After a roll, sending new edits should not fail. ch.startLogSegment(3L).get(); Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java?rev=1383137&r1=1383136&r2=1383137&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java Mon Sep 10 22:30:52 2012 @@ -101,6 +101,7 @@ public class TestJournalNode { journal.getMetricsForTests().getName()); MetricsAsserts.assertCounter("BatchesWritten", 0L, metrics); MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics); + MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics); IPCLoggerChannel ch = new IPCLoggerChannel( conf, FAKE_NSINFO, JID, jn.getBoundIpcAddress()); @@ -113,6 +114,17 @@ public class TestJournalNode { journal.getMetricsForTests().getName()); MetricsAsserts.assertCounter("BatchesWritten", 1L, metrics); MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 0L, metrics); + MetricsAsserts.assertGauge("CurrentLagTxns", 0L, metrics); + + ch.setCommittedTxId(100L); + ch.sendEdits(1L, 2, 1, "goodbye".getBytes(Charsets.UTF_8)).get(); + + metrics = MetricsAsserts.getMetrics( + journal.getMetricsForTests().getName()); + MetricsAsserts.assertCounter("BatchesWritten", 2L, metrics); + MetricsAsserts.assertCounter("BatchesWrittenWhileLagging", 1L, metrics); + MetricsAsserts.assertGauge("CurrentLagTxns", 98L, metrics); + }