hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1417596 [1/6] - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ src/main/bin/ src/main/java/org/apache/hadoop/hdfs/ src/main/java...
Date Wed, 05 Dec 2012 19:22:25 GMT
Author: todd
Date: Wed Dec  5 19:22:17 2012
New Revision: 1417596

URL: http://svn.apache.org/viewvc?rev=1417596&view=rev
Log:
Merge HDFS-3077 (QuorumJournalManager) backport into branch-2.

Added:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannelMetrics.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/LoggerTooFarBehindException.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumException.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/SegmentRecoveryComparator.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalOutOfSyncException.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolPB.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalFaultInjector.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/BestEffortLongFile.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestSegmentRecoveryComparator.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestBestEffortLongFile.java
Removed:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HDFSHighAvailability.apt.vm
Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/pom.xml
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/BinaryEditsVisitor.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/dfshealth.jsp
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileOutputStream.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRecovery.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestInitializeSharedEdits.java

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Dec  5 19:22:17 2012
@@ -22,6 +22,10 @@ Release 2.0.3-alpha - Unreleased
     HDFS-4213. Add an API to hsync for updating the last block length at the
     namenode. (Jing Zhao via szetszwo)
 
+    HDFS-3077. Implement QuorumJournalManager, a distributed mechanism for
+    reliably storing HDFS edit logs. See dedicated section below for breakdown
+    of subtasks.
+
   IMPROVEMENTS
   
     HDFS-3925. Prettify PipelineAck#toString() for printing to a log
@@ -282,6 +286,108 @@ Release 2.0.3-alpha - Unreleased
 
     HDFS-4231. BackupNode: Introduce BackupState. (shv)
 
+  BREAKDOWN OF HDFS-3077 SUBTASKS
+
+    HDFS-3077. Quorum-based protocol for reading and writing edit logs.
+    (todd, Brandon Li, and Hari Mankude via todd)
+    
+    HDFS-3694. Fix getEditLogManifest to fetch httpPort if necessary (todd)
+    
+    HDFS-3692. Support purgeEditLogs() call to remotely purge logs on JNs
+    (todd)
+    
+    HDFS-3693. JNStorage should read its storage info even before a writer
+    becomes active (todd)
+    
+    HDFS-3725. Fix QJM startup when individual JNs have gaps (todd)
+    
+    HDFS-3741. Exhaustive failure injection test for skipped RPCs (todd)
+    
+    HDFS-3773. TestNNWithQJM fails after HDFS-3741. (atm)
+    
+    HDFS-3793. Implement genericized format() in QJM (todd)
+    
+    HDFS-3795. QJM: validate journal dir at startup (todd)
+    
+    HDFS-3798. Avoid throwing NPE when finalizeSegment() is called on invalid
+    segment (todd)
+    
+    HDFS-3799. QJM: handle empty log segments during recovery (todd)
+    
+    HDFS-3797. QJM: add segment txid as a parameter to journal() RPC (todd)
+    
+    HDFS-3800. improvements to QJM fault testing (todd)
+    
+    HDFS-3823. QJM: TestQJMWithFaults fails occasionally because of missed
+    setting of HTTP port. (todd and atm)
+    
+    HDFS-3826. QJM: Some trivial logging / exception text improvements. (todd
+    and atm)
+    
+    HDFS-3839. QJM: hadoop-daemon.sh should be updated to accept "journalnode"
+    (eli)
+    
+    HDFS-3845. Fixes for edge cases in QJM recovery protocol (todd)
+    
+    HDFS-3877. QJM: Provide defaults for dfs.journalnode.*address (eli)
+    
+    HDFS-3863. Track last "committed" txid in QJM (todd)
+    
+    HDFS-3869. Expose non-file journal manager details in web UI (todd)
+    
+    HDFS-3884. Journal format() should reset cached values (todd)
+    
+    HDFS-3870. Add metrics to JournalNode (todd)
+    
+    HDFS-3891. Make selectInputStreams throw IOE instead of RTE (todd)
+    
+    HDFS-3726. If a logger misses an RPC, don't retry that logger until next
+    segment (todd)
+    
+    HDFS-3893. QJM: Make QJM work with security enabled. (atm)
+    
+    HDFS-3897. QJM: TestBlockToken fails after HDFS-3893. (atm)
+    
+    HDFS-3898. QJM: enable TCP_NODELAY for IPC (todd)
+    
+    HDFS-3885. QJM: optimize log sync when JN is lagging behind (todd)
+    
+    HDFS-3900. QJM: avoid validating log segments on log rolls (todd)
+    
+    HDFS-3901. QJM: send 'heartbeat' messages to JNs even when they are
+    out-of-sync (todd)
+    
+    HDFS-3899. QJM: Add client-side metrics (todd)
+    
+    HDFS-3914. QJM: acceptRecovery should abort current segment (todd)
+    
+    HDFS-3915. QJM: Failover fails with auth error in secure cluster (todd)
+    
+    HDFS-3906. QJM: quorum timeout on failover with large log segment (todd)
+    
+    HDFS-3840. JournalNodes log JournalNotFormattedException backtrace error
+    before being formatted (todd)
+    
+    HDFS-3894. QJM: testRecoverAfterDoubleFailures can be flaky due to IPC
+    client caching (todd)
+    
+    HDFS-3926. QJM: Add user documentation for QJM. (atm)
+    
+    HDFS-3943. QJM: remove currently-unused md5sum field (todd)
+    
+    HDFS-3950. QJM: misc TODO cleanup, improved log messages, etc. (todd)
+    
+    HDFS-3955. QJM: Make acceptRecovery() atomic. (todd)
+    
+    HDFS-3956. QJM: purge temporary files when no longer within retention
+    period (todd)
+    
+    HDFS-4004. TestJournalNode#testJournal fails because of test case execution
+    order (Chao Shi via todd)
+    
+    HDFS-4017. Unclosed FileInputStream in GetJournalEditServlet
+    (Chao Shi via todd)
+
 Release 2.0.2-alpha - 2012-09-07 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml Wed Dec  5 19:22:17 2012
@@ -9,6 +9,9 @@
        <Package name="org.apache.hadoop.hdfs.server.namenode.ha.proto" />
      </Match>
      <Match>
+       <Package name="org.apache.hadoop.hdfs.qjournal.protocol" />
+     </Match>
+     <Match>
        <Bug pattern="EI_EXPOSE_REP" />
      </Match>
      <Match>
@@ -273,10 +276,18 @@
        <Method name="quit" />
        <Bug pattern="DM_EXIT" />
      </Match>
+
      <!-- Don't complain about recoverBlock equality check -->
      <Match>
        <Class name="org.apache.hadoop.hdfs.server.datanode.DataNode" />
        <Method name="recoverBlock" />
        <Bug pattern="EC_UNRELATED_TYPES" />
      </Match>
+
+     <!-- More complex cleanup logic confuses findbugs -->
+     <Match>
+       <Class name="org.apache.hadoop.hdfs.qjournal.server.Journal" />
+       <Method name="persistPaxosData" />
+       <Bug pattern="OS_OPEN_STREAM" />
+     </Match>
  </FindBugsFilter>

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/pom.xml?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/pom.xml (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/pom.xml Wed Dec  5 19:22:17 2012
@@ -235,6 +235,25 @@ http://maven.apache.org/xsd/maven-4.0.0.
             </configuration>
           </execution>
           <execution>
+            <id>journal</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <configuration>
+              <compile>false</compile>
+              <workingDirectory>${project.build.directory}/generated-src/main/jsp</workingDirectory>
+              <webFragmentFile>${project.build.directory}/journal-jsp-servlet-definitions.xml</webFragmentFile>
+              <packageName>org.apache.hadoop.hdfs.server.journalservice</packageName>
+              <sources>
+                <directory>${basedir}/src/main/webapps/journal</directory>
+                <includes>
+                  <include>*.jsp</include>
+                </includes>
+              </sources>
+            </configuration>
+          </execution>
+          <execution>
             <id>datanode</id>
             <phase>generate-sources</phase>
             <goals>
@@ -321,6 +340,7 @@ http://maven.apache.org/xsd/maven-4.0.0.
                 <loadfile property="hdfs.servlet.definitions" srcFile="${project.build.directory}/hdfs-jsp-servlet-definitions.xml"/>
                 <loadfile property="secondary.servlet.definitions" srcFile="${project.build.directory}/secondary-jsp-servlet-definitions.xml"/>
                 <loadfile property="datanode.servlet.definitions" srcFile="${project.build.directory}/datanode-jsp-servlet-definitions.xml"/>
+                <loadfile property="journal.servlet.definitions" srcFile="${project.build.directory}/journal-jsp-servlet-definitions.xml"/>               
                 <echoproperties destfile="${project.build.directory}/webxml.properties">
                   <propertyset>
                     <propertyref regex=".*.servlet.definitions"/>
@@ -336,6 +356,9 @@ http://maven.apache.org/xsd/maven-4.0.0.
                 <copy file="${basedir}/src/main/webapps/proto-datanode-web.xml"
                       tofile="${project.build.directory}/webapps/datanode/WEB-INF/web.xml"
                       filtering="true"/>
+                <copy file="${basedir}/src/main/webapps/proto-journal-web.xml"
+                      tofile="${project.build.directory}/webapps/journal/WEB-INF/web.xml"
+                      filtering="true"/>
                 <copy toDir="${project.build.directory}/webapps">
                   <fileset dir="${basedir}/src/main/webapps">
                     <exclude name="**/*.jsp"/>
@@ -437,6 +460,21 @@ http://maven.apache.org/xsd/maven-4.0.0.
               </arguments>
             </configuration>
           </execution>
+          <execution>
+            <id>compile-proto-qjournal</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+            <configuration>
+              <executable>protoc</executable>
+              <arguments>
+                <argument>-Isrc/main/proto/</argument>
+                <argument>--java_out=target/generated-sources/java</argument>
+                <argument>src/main/proto/QJournalProtocol.proto</argument>
+              </arguments>
+            </configuration>
+          </execution>
         </executions>
       </plugin>
       <plugin>

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogOutputStream.java Wed Dec  5 19:22:17 2012
@@ -84,7 +84,7 @@ class BookKeeperEditLogOutputStream
   @Override
   public void close() throws IOException {
     setReadyToFlush();
-    flushAndSync();
+    flushAndSync(true);
     try {
       lh.close();
     } catch (InterruptedException ie) {
@@ -130,7 +130,7 @@ class BookKeeperEditLogOutputStream
   }
 
   @Override
-  public void flushAndSync() throws IOException {
+  public void flushAndSync(boolean durable) throws IOException {
     assert(syncLatch != null);
     try {
       syncLatch.await();

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs Wed Dec  5 19:22:17 2012
@@ -30,6 +30,7 @@ function print_usage(){
   echo "  namenode -format     format the DFS filesystem"
   echo "  secondarynamenode    run the DFS secondary namenode"
   echo "  namenode             run the DFS namenode"
+  echo "  journalnode          run the DFS journalnode"
   echo "  zkfc                 run the ZK Failover Controller daemon"
   echo "  datanode             run a DFS datanode"
   echo "  dfsadmin             run a DFS admin client"
@@ -90,6 +91,9 @@ elif [ "$COMMAND" = "datanode" ] ; then
   else
     HADOOP_OPTS="$HADOOP_OPTS -server $HADOOP_DATANODE_OPTS"
   fi
+elif [ "$COMMAND" = "journalnode" ] ; then
+  CLASS='org.apache.hadoop.hdfs.qjournal.server.JournalNode'
+  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_JOURNALNODE_OPTS"
 elif [ "$COMMAND" = "dfs" ] ; then
   CLASS=org.apache.hadoop.fs.FsShell
   HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Dec  5 19:22:17 2012
@@ -409,4 +409,42 @@ public class DFSConfigKeys extends Commo
   public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
   public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
   public static final String DFS_DATA_ENCRYPTION_ALGORITHM_KEY = "dfs.encrypt.data.transfer.algorithm";
+  
+  // Journal-node related configs. These are read on the JN side.
+  public static final String  DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";
+  public static final String  DFS_JOURNALNODE_EDITS_DIR_DEFAULT = "/tmp/hadoop/dfs/journalnode/";
+  public static final String  DFS_JOURNALNODE_RPC_ADDRESS_KEY = "dfs.journalnode.rpc-address";
+  public static final int     DFS_JOURNALNODE_RPC_PORT_DEFAULT = 8485;
+  public static final String  DFS_JOURNALNODE_RPC_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_RPC_PORT_DEFAULT;
+    
+  public static final String  DFS_JOURNALNODE_HTTP_ADDRESS_KEY = "dfs.journalnode.http-address";
+  public static final int     DFS_JOURNALNODE_HTTP_PORT_DEFAULT = 8480;
+  public static final String  DFS_JOURNALNODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_HTTP_PORT_DEFAULT;
+
+  public static final String  DFS_JOURNALNODE_KEYTAB_FILE_KEY = "dfs.journalnode.keytab.file";
+  public static final String  DFS_JOURNALNODE_USER_NAME_KEY = "dfs.journalnode.kerberos.principal";
+  public static final String  DFS_JOURNALNODE_INTERNAL_SPNEGO_USER_NAME_KEY = "dfs.journalnode.kerberos.internal.spnego.principal";
+
+  // Journal-node related configs for the client side.
+  public static final String  DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = "dfs.qjournal.queued-edits.limit.mb";
+  public static final int     DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT = 10;
+  
+  // Quorum-journal timeouts for various operations. Unlikely to need
+  // to be tweaked, but configurable just in case.
+  public static final String  DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY = "dfs.qjournal.start-segment.timeout.ms";
+  public static final String  DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_KEY = "dfs.qjournal.prepare-recovery.timeout.ms";
+  public static final String  DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_KEY = "dfs.qjournal.accept-recovery.timeout.ms";
+  public static final String  DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_KEY = "dfs.qjournal.finalize-segment.timeout.ms";
+  public static final String  DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY = "dfs.qjournal.select-input-streams.timeout.ms";
+  public static final String  DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_KEY = "dfs.qjournal.get-journal-state.timeout.ms";
+  public static final String  DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_KEY = "dfs.qjournal.new-epoch.timeout.ms";
+  public static final String  DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_KEY = "dfs.qjournal.write-txns.timeout.ms";
+  public static final int     DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT = 20000;
+  public static final int     DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT = 20000;
+  public static final int     DFS_QJOURNAL_GET_JOURNAL_STATE_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_NEW_EPOCH_TIMEOUT_DEFAULT = 120000;
+  public static final int     DFS_QJOURNAL_WRITE_TXNS_TIMEOUT_DEFAULT = 20000;
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Wed Dec  5 19:22:17 2012
@@ -81,6 +81,7 @@ import org.apache.hadoop.ipc.ProtobufRpc
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
@@ -494,6 +495,34 @@ public class DFSUtil {
   }
 
   /**
+   * @return a collection of all configured NN Kerberos principals.
+   */
+  public static Set<String> getAllNnPrincipals(Configuration conf) throws IOException {
+    Set<String> principals = new HashSet<String>();
+    for (String nsId : DFSUtil.getNameServiceIds(conf)) {
+      if (HAUtil.isHAEnabled(conf, nsId)) {
+        for (String nnId : DFSUtil.getNameNodeIds(conf, nsId)) {
+          Configuration confForNn = new Configuration(conf);
+          NameNode.initializeGenericKeys(confForNn, nsId, nnId);
+          String principal = SecurityUtil.getServerPrincipal(confForNn
+              .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
+              NameNode.getAddress(confForNn).getHostName());
+          principals.add(principal);
+        }
+      } else {
+        Configuration confForNn = new Configuration(conf);
+        NameNode.initializeGenericKeys(confForNn, nsId, null);
+        String principal = SecurityUtil.getServerPrincipal(confForNn
+            .get(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY),
+            NameNode.getAddress(confForNn).getHostName());
+        principals.add(principal);
+      }
+    }
+
+    return principals;
+  }
+
+  /**
    * Returns list of InetSocketAddress corresponding to HA NN RPC addresses from
    * the configuration.
    * 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java Wed Dec  5 19:22:17 2012
@@ -23,6 +23,7 @@ import org.apache.hadoop.ha.HAServicePro
 import org.apache.hadoop.ha.ZKFCProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@@ -46,6 +47,7 @@ public class HDFSPolicyProvider extends 
     new Service("security.inter.datanode.protocol.acl", 
                 InterDatanodeProtocol.class),
     new Service("security.namenode.protocol.acl", NamenodeProtocol.class),
+    new Service("security.qjournal.service.protocol.acl", QJournalProtocol.class),
     new Service(CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL,
         HAServiceProtocol.class),
     new Service(CommonConfigurationKeys.SECURITY_ZKFC_PROTOCOL_ACL,

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Wed Dec  5 19:22:17 2012
@@ -325,12 +325,15 @@ public class PBHelper {
   }
 
   public static RemoteEditLogProto convert(RemoteEditLog log) {
-    return RemoteEditLogProto.newBuilder().setEndTxId(log.getEndTxId())
-        .setStartTxId(log.getStartTxId()).build();
+    return RemoteEditLogProto.newBuilder()
+        .setStartTxId(log.getStartTxId())
+        .setEndTxId(log.getEndTxId())
+        .setIsInProgress(log.isInProgress()).build();
   }
 
   public static RemoteEditLog convert(RemoteEditLogProto l) {
-    return new RemoteEditLog(l.getStartTxId(), l.getEndTxId());
+    return new RemoteEditLog(l.getStartTxId(), l.getEndTxId(),
+        l.getIsInProgress());
   }
 
   public static RemoteEditLogManifestProto convert(

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Interface for a remote log which is only communicated with asynchronously.
+ * This is essentially a wrapper around {@link QJournalProtocol} with the key
+ * differences being:
+ * 
+ * <ul>
+ * <li>All methods return {@link ListenableFuture}s instead of synchronous
+ * objects.</li>
+ * <li>The {@link RequestInfo} objects are created by the underlying
+ * implementation.</li>
+ * </ul>
+ */
+interface AsyncLogger {
+  
+  interface Factory {
+    AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
+        String journalId, InetSocketAddress addr);
+  }
+
+  /**
+   * Send a batch of edits to the logger.
+   * @param segmentTxId the first txid in the current segment
+   * @param firstTxnId the first txid of the edits.
+   * @param numTxns the number of transactions in the batch
+   * @param data the actual data to be sent
+   */
+  public ListenableFuture<Void> sendEdits(
+      final long segmentTxId, final long firstTxnId,
+      final int numTxns, final byte[] data);
+
+  /**
+   * Begin writing a new log segment.
+   * 
+   * @param txid the first txid to be written to the new log
+   */
+  public ListenableFuture<Void> startLogSegment(long txid);
+
+  /**
+   * Finalize a log segment.
+   * 
+   * @param startTxId the first txid that was written to the segment
+   * @param endTxId the last txid that was written to the segment
+   */
+  public ListenableFuture<Void> finalizeLogSegment(
+      long startTxId, long endTxId);
+
+  /**
+   * Allow the remote node to purge edit logs earlier than this.
+   * @param minTxIdToKeep the min txid which must be retained
+   */
+  public ListenableFuture<Void> purgeLogsOlderThan(long minTxIdToKeep);
+
+  /**
+   * Format the log directory.
+   * @param nsInfo the namespace info to format with
+   */
+  public ListenableFuture<Void> format(NamespaceInfo nsInfo);
+
+  /**
+   * @return whether or not the remote node has any valid data.
+   */
+  public ListenableFuture<Boolean> isFormatted();
+  
+  /**
+   * @return the state of the last epoch on the target node.
+   */
+  public ListenableFuture<GetJournalStateResponseProto> getJournalState();
+
+  /**
+   * Begin a new epoch on the target node.
+   */
+  public ListenableFuture<NewEpochResponseProto> newEpoch(long epoch);
+  
+  /**
+   * Fetch the list of edit logs available on the remote node.
+   */
+  public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
+      long fromTxnId);
+
+  /**
+   * Prepare recovery. See the HDFS-3077 design document for details.
+   */
+  public ListenableFuture<PrepareRecoveryResponseProto> prepareRecovery(
+      long segmentTxId);
+
+  /**
+   * Accept a recovery proposal. See the HDFS-3077 design document for details.
+   */
+  public ListenableFuture<Void> acceptRecovery(SegmentStateProto log,
+      URL fromUrl);
+
+  /**
+   * Set the epoch number used for all future calls.
+   */
+  public void setEpoch(long e);
+
+  /**
+   * Let the logger know the highest committed txid across all loggers in the
+   * set. This txid may be higher than the last committed txid for <em>this</em>
+   * logger. See HDFS-3863 for details.
+   */
+  public void setCommittedTxId(long txid);
+
+  /**
+   * Build an HTTP URL to fetch the log segment with the given startTxId.
+   */
+  public URL buildURLToFetchLogs(long segmentTxId);
+  
+  /**
+   * Tear down any resources, connections, etc. The proxy may not be used
+   * after this point, and any in-flight RPCs may throw an exception.
+   */
+  public void close();
+
+  /**
+   * Append an HTML-formatted report for this logger's status to the provided
+   * StringBuilder. This is displayed on the NN web UI.
+   */
+  public void appendHtmlReport(StringBuilder sb);
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.jasper.compiler.JspUtil;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Wrapper around a set of Loggers, taking care of fanning out
+ * calls to the underlying loggers and constructing corresponding
+ * {@link QuorumCall} instances.
+ */
+class AsyncLoggerSet {
+  static final Log LOG = LogFactory.getLog(AsyncLoggerSet.class);
+
+  private final List<AsyncLogger> loggers;
+  
+  private static final long INVALID_EPOCH = -1;
+  private long myEpoch = INVALID_EPOCH;
+  
+  public AsyncLoggerSet(List<AsyncLogger> loggers) {
+    this.loggers = ImmutableList.copyOf(loggers);
+  }
+  
+  void setEpoch(long e) {
+    Preconditions.checkState(!isEpochEstablished(),
+        "Epoch already established: epoch=%s", myEpoch);
+    myEpoch = e;
+    for (AsyncLogger l : loggers) {
+      l.setEpoch(e);
+    }
+  }
+
+  /**
+   * Set the highest successfully committed txid seen by the writer.
+   * This should be called after a successful write to a quorum, and is used
+   * for extra sanity checks against the protocol. See HDFS-3863.
+   */
+  public void setCommittedTxId(long txid) {
+    for (AsyncLogger logger : loggers) {
+      logger.setCommittedTxId(txid);
+    }
+  }
+
+  /**
+   * @return true if an epoch has been established.
+   */
+  boolean isEpochEstablished() {
+    return myEpoch != INVALID_EPOCH;
+  }
+  
+  /**
+   * @return the epoch number for this writer. This may only be called after
+   * a successful call to {@link #createNewUniqueEpoch(NamespaceInfo)}.
+   */
+  long getEpoch() {
+    Preconditions.checkState(myEpoch != INVALID_EPOCH,
+        "No epoch created yet");
+    return myEpoch;
+  }
+
+  /**
+   * Close all of the underlying loggers.
+   */
+  void close() {
+    for (AsyncLogger logger : loggers) {
+      logger.close();
+    }
+  }
+  
+  void purgeLogsOlderThan(long minTxIdToKeep) {
+    for (AsyncLogger logger : loggers) {
+      logger.purgeLogsOlderThan(minTxIdToKeep);
+    }
+  }
+
+
+  /**
+   * Wait for a quorum of loggers to respond to the given call. If a quorum
+   * can't be achieved, throws a QuorumException.
+   * @param q the quorum call
+   * @param timeoutMs the number of millis to wait
+   * @param operationName textual description of the operation, for logging
+   * @return a map of successful results
+   * @throws QuorumException if a quorum doesn't respond with success
+   * @throws IOException if the thread is interrupted or times out
+   */
+  <V> Map<AsyncLogger, V> waitForWriteQuorum(QuorumCall<AsyncLogger, V> q,
+      int timeoutMs, String operationName) throws IOException {
+    int majority = getMajoritySize();
+    try {
+      q.waitFor(
+          loggers.size(), // either all respond 
+          majority, // or we get a majority successes
+          majority, // or we get a majority failures,
+          timeoutMs, operationName);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IOException("Interrupted waiting " + timeoutMs + "ms for a " +
+          "quorum of nodes to respond.");
+    } catch (TimeoutException e) {
+      throw new IOException("Timed out waiting " + timeoutMs + "ms for a " +
+          "quorum of nodes to respond.");
+    }
+    
+    if (q.countSuccesses() < majority) {
+      q.rethrowException("Got too many exceptions to achieve quorum size " +
+          getMajorityString());
+    }
+    
+    return q.getResults();
+  }
+  
+  /**
+   * @return the number of nodes which are required to obtain a quorum.
+   */
+  int getMajoritySize() {
+    return loggers.size() / 2 + 1;
+  }
+  
+  /**
+   * @return a textual description of the majority size (eg "2/3" or "3/5")
+   */
+  String getMajorityString() {
+    return getMajoritySize() + "/" + loggers.size();
+  }
+
+  /**
+   * @return the number of loggers behind this set
+   */
+  int size() {
+    return loggers.size();
+  }
+  
+  @Override
+  public String toString() {
+    return "[" + Joiner.on(", ").join(loggers) + "]";
+  }
+
+  /**
+   * Append an HTML-formatted status readout on the current
+   * state of the underlying loggers.
+   * @param sb the StringBuilder to append to
+   */
+  void appendHtmlReport(StringBuilder sb) {
+    sb.append("<table class=\"storage\">");
+    sb.append("<thead><tr><td>JN</td><td>Status</td></tr></thead>\n");
+    for (AsyncLogger l : loggers) {
+      sb.append("<tr>");
+      sb.append("<td>" + JspUtil.escapeXml(l.toString()) + "</td>");
+      sb.append("<td>");
+      l.appendHtmlReport(sb);
+      sb.append("</td></tr>\n");
+    }
+    sb.append("</table>");
+  }
+
+  /**
+   * @return the (mutable) list of loggers, for use in tests to
+   * set up spies
+   */
+  @VisibleForTesting
+  List<AsyncLogger> getLoggersForTests() {
+    return loggers;
+  }
+  
+  ///////////////////////////////////////////////////////////////////////////
+  // The rest of this file is simply boilerplate wrappers which fan-out the
+  // various IPC calls to the underlying AsyncLoggers and wrap the result
+  // in a QuorumCall.
+  ///////////////////////////////////////////////////////////////////////////
+  
+  public QuorumCall<AsyncLogger, GetJournalStateResponseProto> getJournalState() {
+    Map<AsyncLogger, ListenableFuture<GetJournalStateResponseProto>> calls =
+        Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      calls.put(logger, logger.getJournalState());
+    }
+    return QuorumCall.create(calls);    
+  }
+  
+  public QuorumCall<AsyncLogger, Boolean> isFormatted() {
+    Map<AsyncLogger, ListenableFuture<Boolean>> calls = Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      calls.put(logger, logger.isFormatted());
+    }
+    return QuorumCall.create(calls);
+  }
+
+  public QuorumCall<AsyncLogger,NewEpochResponseProto> newEpoch(
+      NamespaceInfo nsInfo,
+      long epoch) {
+    Map<AsyncLogger, ListenableFuture<NewEpochResponseProto>> calls =
+        Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      calls.put(logger, logger.newEpoch(epoch));
+    }
+    return QuorumCall.create(calls);    
+  }
+
+  public QuorumCall<AsyncLogger, Void> startLogSegment(
+      long txid) {
+    Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      calls.put(logger, logger.startLogSegment(txid));
+    }
+    return QuorumCall.create(calls);
+  }
+  
+  public QuorumCall<AsyncLogger, Void> finalizeLogSegment(long firstTxId,
+      long lastTxId) {
+    Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      calls.put(logger, logger.finalizeLogSegment(firstTxId, lastTxId));
+    }
+    return QuorumCall.create(calls);
+  }
+  
+  public QuorumCall<AsyncLogger, Void> sendEdits(
+      long segmentTxId, long firstTxnId, int numTxns, byte[] data) {
+    Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      ListenableFuture<Void> future = 
+        logger.sendEdits(segmentTxId, firstTxnId, numTxns, data);
+      calls.put(logger, future);
+    }
+    return QuorumCall.create(calls);
+  }
+
+  public QuorumCall<AsyncLogger, RemoteEditLogManifest>
+      getEditLogManifest(long fromTxnId) {
+    Map<AsyncLogger,
+        ListenableFuture<RemoteEditLogManifest>> calls
+        = Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      ListenableFuture<RemoteEditLogManifest> future =
+          logger.getEditLogManifest(fromTxnId);
+      calls.put(logger, future);
+    }
+    return QuorumCall.create(calls);
+  }
+
+  QuorumCall<AsyncLogger, PrepareRecoveryResponseProto>
+      prepareRecovery(long segmentTxId) {
+    Map<AsyncLogger,
+      ListenableFuture<PrepareRecoveryResponseProto>> calls
+      = Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      ListenableFuture<PrepareRecoveryResponseProto> future =
+          logger.prepareRecovery(segmentTxId);
+      calls.put(logger, future);
+    }
+    return QuorumCall.create(calls);
+  }
+
+  QuorumCall<AsyncLogger,Void>
+      acceptRecovery(SegmentStateProto log, URL fromURL) {
+    Map<AsyncLogger, ListenableFuture<Void>> calls
+      = Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      ListenableFuture<Void> future =
+          logger.acceptRecovery(log, fromURL);
+      calls.put(logger, future);
+    }
+    return QuorumCall.create(calls);
+  }
+
+  QuorumCall<AsyncLogger,Void> format(NamespaceInfo nsInfo) {
+    Map<AsyncLogger, ListenableFuture<Void>> calls =
+        Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      ListenableFuture<Void> future =
+          logger.format(nsInfo);
+      calls.put(logger, future);
+    }
+    return QuorumCall.create(calls);
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,588 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
+import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.qjournal.server.GetJournalEditServlet;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.SecurityUtil;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.net.InetAddresses;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.common.util.concurrent.UncaughtExceptionHandlers;
+
+/**
+ * Channel to a remote JournalNode using Hadoop IPC.
+ * All of the calls are run on a separate thread, and return
+ * {@link ListenableFuture} instances to wait for their result.
+ * This allows calls to be bound together using the {@link QuorumCall}
+ * class.
+ */
+@InterfaceAudience.Private
+public class IPCLoggerChannel implements AsyncLogger {
+
+  private final Configuration conf;
+  protected final InetSocketAddress addr;
+  private QJournalProtocol proxy;
+
+  private final ListeningExecutorService executor;
+  private long ipcSerial = 0;
+  private long epoch = -1;
+  private long committedTxId = HdfsConstants.INVALID_TXID;
+  
+  private final String journalId;
+  private final NamespaceInfo nsInfo;
+  private int httpPort = -1;
+  
+  private final IPCLoggerChannelMetrics metrics;
+  
+  /**
+   * The number of bytes of edits data still in the queue.
+   */
+  private int queuedEditsSizeBytes = 0;
+  
+  /**
+   * The highest txid that has been successfully logged on the remote JN.
+   */
+  private long highestAckedTxId = 0;
+
+  /**
+   * Nanotime of the last time we successfully journaled some edits
+   * to the remote node.
+   */
+  private long lastAckNanos = 0;
+
+  /**
+   * Nanotime of the last time that committedTxId was update. Used
+   * to calculate the lag in terms of time, rather than just a number
+   * of txns.
+   */
+  private long lastCommitNanos = 0;
+  
+  /**
+   * The maximum number of bytes that can be pending in the queue.
+   * This keeps the writer from hitting OOME if one of the loggers
+   * starts responding really slowly. Eventually, the queue
+   * overflows and it starts to treat the logger as having errored.
+   */
+  private final int queueSizeLimitBytes;
+
+  /**
+   * If this logger misses some edits, or restarts in the middle of
+   * a segment, the writer won't be able to write any more edits until
+   * the beginning of the next segment. Upon detecting this situation,
+   * the writer sets this flag to true to avoid sending useless RPCs.
+   */
+  private boolean outOfSync = false;
+  
+  /**
+   * Stopwatch which starts counting on each heartbeat that is sent
+   */
+  private Stopwatch lastHeartbeatStopwatch = new Stopwatch();
+  
+  private static final long HEARTBEAT_INTERVAL_MILLIS = 1000;
+  
+  static final Factory FACTORY = new AsyncLogger.Factory() {
+    @Override
+    public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
+        String journalId, InetSocketAddress addr) {
+      return new IPCLoggerChannel(conf, nsInfo, journalId, addr);
+    }
+  };
+
+  public IPCLoggerChannel(Configuration conf,
+      NamespaceInfo nsInfo,
+      String journalId,
+      InetSocketAddress addr) {
+    this.conf = conf;
+    this.nsInfo = nsInfo;
+    this.journalId = journalId;
+    this.addr = addr;
+    
+    this.queueSizeLimitBytes = 1024 * 1024 * conf.getInt(
+        DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY,
+        DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT);
+    
+    executor = MoreExecutors.listeningDecorator(
+        createExecutor());
+    
+    metrics = IPCLoggerChannelMetrics.create(this);
+  }
+  
+  @Override
+  public synchronized void setEpoch(long epoch) {
+    this.epoch = epoch;
+  }
+  
+  @Override
+  public synchronized void setCommittedTxId(long txid) {
+    Preconditions.checkArgument(txid >= committedTxId,
+        "Trying to move committed txid backwards in client " +
+         "old: %s new: %s", committedTxId, txid);
+    this.committedTxId = txid;
+    this.lastCommitNanos = System.nanoTime();
+  }
+  
+  @Override
+  public void close() {
+    // No more tasks may be submitted after this point.
+    executor.shutdown();
+    if (proxy != null) {
+      // TODO: this can hang for quite some time if the client
+      // is currently in the middle of a call to a downed JN.
+      // We should instead do this asynchronously, and just stop
+      // making any more calls after this point (eg clear the queue)
+      RPC.stopProxy(proxy);
+    }
+  }
+  
+  protected QJournalProtocol getProxy() throws IOException {
+    if (proxy != null) return proxy;
+    proxy = createProxy();
+    return proxy;
+  }
+  
+  protected QJournalProtocol createProxy() throws IOException {
+    final Configuration confCopy = new Configuration(conf);
+    
+    // Need to set NODELAY or else batches larger than MTU can trigger 
+    // 40ms nagling delays.
+    confCopy.setBoolean(
+        CommonConfigurationKeysPublic.IPC_CLIENT_TCPNODELAY_KEY,
+        true);
+    
+    RPC.setProtocolEngine(confCopy,
+        QJournalProtocolPB.class, ProtobufRpcEngine.class);
+    return SecurityUtil.doAsLoginUser(
+        new PrivilegedExceptionAction<QJournalProtocol>() {
+          @Override
+          public QJournalProtocol run() throws IOException {
+            RPC.setProtocolEngine(confCopy,
+                QJournalProtocolPB.class, ProtobufRpcEngine.class);
+            QJournalProtocolPB pbproxy = RPC.getProxy(
+                QJournalProtocolPB.class,
+                RPC.getProtocolVersion(QJournalProtocolPB.class),
+                addr, confCopy);
+            return new QJournalProtocolTranslatorPB(pbproxy);
+          }
+        });
+  }
+  
+  
+  /**
+   * Separated out for easy overriding in tests.
+   */
+  @VisibleForTesting
+  protected ExecutorService createExecutor() {
+    return Executors.newSingleThreadExecutor(
+        new ThreadFactoryBuilder()
+          .setDaemon(true)
+          .setNameFormat("Logger channel to " + addr)
+          .setUncaughtExceptionHandler(
+              UncaughtExceptionHandlers.systemExit())
+          .build());
+  }
+  
+  @Override
+  public URL buildURLToFetchLogs(long segmentTxId) {
+    Preconditions.checkArgument(segmentTxId > 0,
+        "Invalid segment: %s", segmentTxId);
+    Preconditions.checkState(httpPort != -1,
+        "HTTP port not set yet");
+        
+    try {
+      String path = GetJournalEditServlet.buildPath(
+          journalId, segmentTxId, nsInfo);
+      return new URL("http", addr.getHostName(), httpPort, path.toString());
+    } catch (MalformedURLException e) {
+      // should never get here.
+      throw new RuntimeException(e);
+    }
+  }
+
+  private synchronized RequestInfo createReqInfo() {
+    Preconditions.checkState(epoch > 0, "bad epoch: " + epoch);
+    return new RequestInfo(journalId, epoch, ipcSerial++,
+        committedTxId);
+  }
+
+  @VisibleForTesting
+  synchronized long getNextIpcSerial() {
+    return ipcSerial;
+  }
+
+  public synchronized int getQueuedEditsSize() {
+    return queuedEditsSizeBytes;
+  }
+  
+  public InetSocketAddress getRemoteAddress() {
+    return addr;
+  }
+
+  /**
+   * @return true if the server has gotten out of sync from the client,
+   * and thus a log roll is required for this logger to successfully start
+   * logging more edits.
+   */
+  public synchronized boolean isOutOfSync() {
+    return outOfSync;
+  }
+  
+  @VisibleForTesting
+  void waitForAllPendingCalls() throws InterruptedException {
+    try {
+      executor.submit(new Runnable() {
+        @Override
+        public void run() {
+        }
+      }).get();
+    } catch (ExecutionException e) {
+      // This can't happen!
+      throw new AssertionError(e);
+    }
+  }
+
+  @Override
+  public ListenableFuture<Boolean> isFormatted() {
+    return executor.submit(new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws IOException {
+        return getProxy().isFormatted(journalId);
+      }
+    });
+  }
+
+  @Override
+  public ListenableFuture<GetJournalStateResponseProto> getJournalState() {
+    return executor.submit(new Callable<GetJournalStateResponseProto>() {
+      @Override
+      public GetJournalStateResponseProto call() throws IOException {
+        GetJournalStateResponseProto ret =
+            getProxy().getJournalState(journalId);
+        httpPort = ret.getHttpPort();
+        return ret;
+      }
+    });
+  }
+
+  @Override
+  public ListenableFuture<NewEpochResponseProto> newEpoch(
+      final long epoch) {
+    return executor.submit(new Callable<NewEpochResponseProto>() {
+      @Override
+      public NewEpochResponseProto call() throws IOException {
+        return getProxy().newEpoch(journalId, nsInfo, epoch);
+      }
+    });
+  }
+  
+  @Override
+  public ListenableFuture<Void> sendEdits(
+      final long segmentTxId, final long firstTxnId,
+      final int numTxns, final byte[] data) {
+    try {
+      reserveQueueSpace(data.length);
+    } catch (LoggerTooFarBehindException e) {
+      return Futures.immediateFailedFuture(e);
+    }
+    
+    // When this batch is acked, we use its submission time in order
+    // to calculate how far we are lagging.
+    final long submitNanos = System.nanoTime();
+    
+    ListenableFuture<Void> ret = null;
+    try {
+      ret = executor.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws IOException {
+          throwIfOutOfSync();
+
+          long rpcSendTimeNanos = System.nanoTime();
+          try {
+            getProxy().journal(createReqInfo(),
+                segmentTxId, firstTxnId, numTxns, data);
+          } catch (IOException e) {
+            QuorumJournalManager.LOG.warn(
+                "Remote journal " + IPCLoggerChannel.this + " failed to " +
+                "write txns " + firstTxnId + "-" + (firstTxnId + numTxns - 1) +
+                ". Will try to write to this JN again after the next " +
+                "log roll.", e); 
+            synchronized (IPCLoggerChannel.this) {
+              outOfSync = true;
+            }
+            throw e;
+          } finally {
+            long now = System.nanoTime();
+            long rpcTime = TimeUnit.MICROSECONDS.convert(
+                now - rpcSendTimeNanos, TimeUnit.NANOSECONDS);
+            long endToEndTime = TimeUnit.MICROSECONDS.convert(
+                now - submitNanos, TimeUnit.NANOSECONDS);
+            metrics.addWriteEndToEndLatency(endToEndTime);
+            metrics.addWriteRpcLatency(rpcTime);
+          }
+          synchronized (IPCLoggerChannel.this) {
+            highestAckedTxId = firstTxnId + numTxns - 1;
+            lastAckNanos = submitNanos;
+          }
+          return null;
+        }
+      });
+    } finally {
+      if (ret == null) {
+        // it didn't successfully get submitted,
+        // so adjust the queue size back down.
+        unreserveQueueSpace(data.length);
+      } else {
+        // It was submitted to the queue, so adjust the length
+        // once the call completes, regardless of whether it
+        // succeeds or fails.
+        Futures.addCallback(ret, new FutureCallback<Void>() {
+          @Override
+          public void onFailure(Throwable t) {
+            unreserveQueueSpace(data.length);
+          }
+
+          @Override
+          public void onSuccess(Void t) {
+            unreserveQueueSpace(data.length);
+          }
+        });
+      }
+    }
+    return ret;
+  }
+
+  private void throwIfOutOfSync()
+      throws JournalOutOfSyncException, IOException {
+    if (isOutOfSync()) {
+      // Even if we're out of sync, it's useful to send an RPC
+      // to the remote node in order to update its lag metrics, etc.
+      heartbeatIfNecessary();
+      throw new JournalOutOfSyncException(
+          "Journal disabled until next roll");
+    }
+  }
+
+  /**
+   * When we've entered an out-of-sync state, it's still useful to periodically
+   * send an empty RPC to the server, such that it has the up to date
+   * committedTxId. This acts as a sanity check during recovery, and also allows
+   * that node's metrics to be up-to-date about its lag.
+   * 
+   * In the future, this method may also be used in order to check that the
+   * current node is still the current writer, even if no edits are being
+   * written.
+   */
+  private void heartbeatIfNecessary() throws IOException {
+    if (lastHeartbeatStopwatch.elapsedMillis() > HEARTBEAT_INTERVAL_MILLIS ||
+        !lastHeartbeatStopwatch.isRunning()) {
+      try {
+        getProxy().heartbeat(createReqInfo());
+      } finally {
+        // Don't send heartbeats more often than the configured interval,
+        // even if they fail.
+        lastHeartbeatStopwatch.reset().start();
+      }
+    }
+  }
+
+  private synchronized void reserveQueueSpace(int size)
+      throws LoggerTooFarBehindException {
+    Preconditions.checkArgument(size >= 0);
+    if (queuedEditsSizeBytes + size > queueSizeLimitBytes &&
+        queuedEditsSizeBytes > 0) {
+      throw new LoggerTooFarBehindException();
+    }
+    queuedEditsSizeBytes += size;
+  }
+  
+  private synchronized void unreserveQueueSpace(int size) {
+    Preconditions.checkArgument(size >= 0);
+    queuedEditsSizeBytes -= size;
+  }
+
+  @Override
+  public ListenableFuture<Void> format(final NamespaceInfo nsInfo) {
+    return executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        getProxy().format(journalId, nsInfo);
+        return null;
+      }
+    });
+  }
+  
+  @Override
+  public ListenableFuture<Void> startLogSegment(final long txid) {
+    return executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws IOException {
+        getProxy().startLogSegment(createReqInfo(), txid);
+        synchronized (IPCLoggerChannel.this) {
+          if (outOfSync) {
+            outOfSync = false;
+            QuorumJournalManager.LOG.info(
+                "Restarting previously-stopped writes to " +
+                IPCLoggerChannel.this + " in segment starting at txid " +
+                txid);
+          }
+        }
+        return null;
+      }
+    });
+  }
+  
+  @Override
+  public ListenableFuture<Void> finalizeLogSegment(
+      final long startTxId, final long endTxId) {
+    return executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws IOException {
+        throwIfOutOfSync();
+        
+        getProxy().finalizeLogSegment(createReqInfo(), startTxId, endTxId);
+        return null;
+      }
+    });
+  }
+  
+  @Override
+  public ListenableFuture<Void> purgeLogsOlderThan(final long minTxIdToKeep) {
+    return executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        getProxy().purgeLogsOlderThan(createReqInfo(), minTxIdToKeep);
+        return null;
+      }
+    });
+  }
+
+  @Override
+  public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
+      final long fromTxnId) {
+    return executor.submit(new Callable<RemoteEditLogManifest>() {
+      @Override
+      public RemoteEditLogManifest call() throws IOException {
+        GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
+            journalId, fromTxnId);
+        // Update the http port, since we need this to build URLs to any of the
+        // returned logs.
+        httpPort = ret.getHttpPort();
+        return PBHelper.convert(ret.getManifest());
+      }
+    });
+  }
+
+  @Override
+  public ListenableFuture<PrepareRecoveryResponseProto> prepareRecovery(
+      final long segmentTxId) {
+    return executor.submit(new Callable<PrepareRecoveryResponseProto>() {
+      @Override
+      public PrepareRecoveryResponseProto call() throws IOException {
+        if (httpPort < 0) {
+          // If the HTTP port hasn't been set yet, force an RPC call so we know
+          // what the HTTP port should be.
+          httpPort = getProxy().getJournalState(journalId).getHttpPort();
+        }
+        return getProxy().prepareRecovery(createReqInfo(), segmentTxId);
+      }
+    });
+  }
+
+  @Override
+  public ListenableFuture<Void> acceptRecovery(
+      final SegmentStateProto log, final URL url) {
+    return executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws IOException {
+        getProxy().acceptRecovery(createReqInfo(), log, url);
+        return null;
+      }
+    });
+  }
+
+  @Override
+  public String toString() {
+    return InetAddresses.toAddrString(addr.getAddress()) + ':' +
+        addr.getPort();
+  }
+
+  @Override
+  public synchronized void appendHtmlReport(StringBuilder sb) {
+    sb.append("Written txid ").append(highestAckedTxId);
+    long behind = getLagTxns();
+    if (behind > 0) {
+      if (lastAckNanos != 0) {
+        long lagMillis = getLagTimeMillis();
+        sb.append(" (" + behind + " txns/" + lagMillis + "ms behind)");
+      } else {
+        sb.append(" (never written");
+      }
+    }
+    if (outOfSync) {
+      sb.append(" (will try to re-sync on next segment)");
+    }
+  }
+  
+  public synchronized long getLagTxns() {
+    return Math.max(committedTxId - highestAckedTxId, 0);
+  }
+  
+  public synchronized long getLagTimeMillis() {
+    return TimeUnit.MILLISECONDS.convert(
+        Math.max(lastCommitNanos - lastAckNanos, 0),
+        TimeUnit.NANOSECONDS);
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannelMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannelMetrics.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannelMetrics.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannelMetrics.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+
+import com.google.common.collect.Maps;
+
+/**
+ * The metrics for a journal from the writer's perspective.
+ */
+@Metrics(about="Journal client metrics", context="dfs")
+class IPCLoggerChannelMetrics {
+  final MetricsRegistry registry = new MetricsRegistry("NameNode");
+
+  private volatile IPCLoggerChannel ch;
+  
+  private final MutableQuantiles[] writeEndToEndLatencyQuantiles;
+  private final MutableQuantiles[] writeRpcLatencyQuantiles;
+
+  
+  /**
+   * In the case of the NN transitioning between states, edit logs are closed
+   * and reopened. Thus, the IPCLoggerChannel instance that writes to a
+   * given JournalNode may change over the lifetime of the process.
+   * However, metrics2 doesn't have a function to unregister a set of metrics
+   * and fails if a new metrics class is registered with the same name
+   * as the existing one. Hence, we have to maintain our own registry
+   * ("multiton") here, so that we have exactly one metrics instance
+   * per JournalNode, and switch out the pointer to the underlying
+   * IPCLoggerChannel instance.
+   */
+  private static final Map<String, IPCLoggerChannelMetrics> REGISTRY =
+      Maps.newHashMap();
+  
+  private IPCLoggerChannelMetrics(IPCLoggerChannel ch) {
+    this.ch = ch;
+    
+    Configuration conf = new HdfsConfiguration();
+    int[] intervals = 
+        conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY);
+    if (intervals != null) {
+      writeEndToEndLatencyQuantiles = new MutableQuantiles[intervals.length];
+      writeRpcLatencyQuantiles = new MutableQuantiles[intervals.length];
+      for (int i = 0; i < writeEndToEndLatencyQuantiles.length; i++) {
+        int interval = intervals[i];
+        writeEndToEndLatencyQuantiles[i] = registry.newQuantiles(
+            "writesE2E" + interval + "s",
+            "End-to-end time for write operations", "ops", "LatencyMicros", interval);
+        writeRpcLatencyQuantiles[i] = registry.newQuantiles(
+            "writesRpc" + interval + "s",
+            "RPC RTT for write operations", "ops", "LatencyMicros", interval);
+      }
+    } else {
+      writeEndToEndLatencyQuantiles = null;
+      writeRpcLatencyQuantiles = null;
+    }
+  }
+  
+  private void setChannel(IPCLoggerChannel ch) {
+    assert ch.getRemoteAddress().equals(this.ch.getRemoteAddress());
+    this.ch = ch;
+  }
+
+  static IPCLoggerChannelMetrics create(IPCLoggerChannel ch) {
+    String name = getName(ch);
+    synchronized (REGISTRY) {
+      IPCLoggerChannelMetrics m = REGISTRY.get(name);
+      if (m != null) {
+        m.setChannel(ch);
+      } else {
+        m = new IPCLoggerChannelMetrics(ch);
+        DefaultMetricsSystem.instance().register(name, null, m);
+        REGISTRY.put(name, m);
+      }
+      return m;
+    }
+  }
+
+  private static String getName(IPCLoggerChannel ch) {
+    InetSocketAddress addr = ch.getRemoteAddress();
+    String addrStr = addr.getAddress().getHostAddress();
+    
+    // IPv6 addresses have colons, which aren't allowed as part of
+    // MBean names. Replace with '.'
+    addrStr = addrStr.replace(':', '.');
+    
+    return "IPCLoggerChannel-" + addrStr +
+        "-" + addr.getPort();
+  }
+
+  @Metric("Is the remote logger out of sync with the quorum")
+  public String isOutOfSync() {
+    return Boolean.toString(ch.isOutOfSync()); 
+  }
+  
+  @Metric("The number of transactions the remote log is lagging behind the " +
+          "quorum")
+  public long getCurrentLagTxns() {
+    return ch.getLagTxns();
+  }
+  
+  @Metric("The number of milliseconds the remote log is lagging behind the " +
+          "quorum")
+  public long getLagTimeMillis() {
+    return ch.getLagTimeMillis();
+  }
+  
+  @Metric("The number of bytes of pending data to be sent to the remote node")
+  public int getQueuedEditsSize() {
+    return ch.getQueuedEditsSize();
+  }
+
+  public void addWriteEndToEndLatency(long micros) {
+    if (writeEndToEndLatencyQuantiles != null) {
+      for (MutableQuantiles q : writeEndToEndLatencyQuantiles) {
+        q.add(micros);
+      }
+    }
+  }
+  
+  public void addWriteRpcLatency(long micros) {
+    if (writeRpcLatencyQuantiles != null) {
+      for (MutableQuantiles q : writeRpcLatencyQuantiles) {
+        q.add(micros);
+      }
+    }
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/LoggerTooFarBehindException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/LoggerTooFarBehindException.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/LoggerTooFarBehindException.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/LoggerTooFarBehindException.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.io.IOException;
+
+class LoggerTooFarBehindException extends IOException {
+  private static final long serialVersionUID = 1L;
+
+}



Mime
View raw message