hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1363596 [1/3] - in /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/main/bin/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/main/java/org/apache/hadoop/hdfs...
Date Fri, 20 Jul 2012 00:25:52 GMT
Author: todd
Date: Fri Jul 20 00:25:50 2012
New Revision: 1363596

URL: http://svn.apache.org/viewvc?rev=1363596&view=rev
Log:
HDFS-3077. Quorum-based protocol for reading and writing edit logs. Contributed by Todd Lipcon based on initial work from Brandon Li and Hari Mankude.

Added:
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/LoggerTooFarBehindException.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumException.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolPB.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
Modified:
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/pom.xml
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt Fri Jul 20 00:25:50 2012
@@ -0,0 +1,4 @@
+Changes for HDFS-3077 branch.
+This will be merged into the main CHANGES.txt when the branch is merged.
+
+HDFS-3077. Quorum-based protocol for reading and writing edit logs. Contributed by Todd Lipcon based on initial work from Brandon Li and Hari Mankude.

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml Fri Jul 20 00:25:50 2012
@@ -9,6 +9,9 @@
        <Package name="org.apache.hadoop.hdfs.server.namenode.ha.proto" />
      </Match>
      <Match>
+       <Package name="org.apache.hadoop.hdfs.qjournal.protocol" />
+     </Match>
+     <Match>
        <Bug pattern="EI_EXPOSE_REP" />
      </Match>
      <Match>
@@ -273,4 +276,11 @@
        <Method name="quit" />
        <Bug pattern="DM_EXIT" />
      </Match>
+
+     <!-- More complex cleanup logic confuses findbugs -->
+     <Match>
+       <Class name="org.apache.hadoop.hdfs.qjournal.server.Journal" />
+       <Method name="persistPaxosData" />
+       <Bug pattern="OS_OPEN_STREAM" />
+     </Match>
  </FindBugsFilter>

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/pom.xml?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/pom.xml (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/pom.xml Fri Jul 20 00:25:50 2012
@@ -178,6 +178,25 @@ http://maven.apache.org/xsd/maven-4.0.0.
             </configuration>
           </execution>
           <execution>
+            <id>journal</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <configuration>
+              <compile>false</compile>
+              <workingDirectory>${project.build.directory}/generated-src/main/jsp</workingDirectory>
+              <webFragmentFile>${project.build.directory}/journal-jsp-servlet-definitions.xml</webFragmentFile>
+              <packageName>org.apache.hadoop.hdfs.server.journalservice</packageName>
+              <sources>
+                <directory>${basedir}/src/main/webapps/journal</directory>
+                <includes>
+                  <include>*.jsp</include>
+                </includes>
+              </sources>
+            </configuration>
+          </execution>
+          <execution>
             <id>datanode</id>
             <phase>generate-sources</phase>
             <goals>
@@ -284,6 +303,7 @@ http://maven.apache.org/xsd/maven-4.0.0.
                 <loadfile property="hdfs.servlet.definitions" srcFile="${project.build.directory}/hdfs-jsp-servlet-definitions.xml"/>
                 <loadfile property="secondary.servlet.definitions" srcFile="${project.build.directory}/secondary-jsp-servlet-definitions.xml"/>
                 <loadfile property="datanode.servlet.definitions" srcFile="${project.build.directory}/datanode-jsp-servlet-definitions.xml"/>
+                <loadfile property="journal.servlet.definitions" srcFile="${project.build.directory}/journal-jsp-servlet-definitions.xml"/>               
                 <echoproperties destfile="${project.build.directory}/webxml.properties">
                   <propertyset>
                     <propertyref regex=".*.servlet.definitions"/>
@@ -299,6 +319,9 @@ http://maven.apache.org/xsd/maven-4.0.0.
                 <copy file="${basedir}/src/main/webapps/proto-datanode-web.xml"
                       tofile="${project.build.directory}/webapps/datanode/WEB-INF/web.xml"
                       filtering="true"/>
+                <copy file="${basedir}/src/main/webapps/proto-journal-web.xml"
+                      tofile="${project.build.directory}/webapps/journal/WEB-INF/web.xml"
+                      filtering="true"/>
                 <copy toDir="${project.build.directory}/webapps">
                   <fileset dir="${basedir}/src/main/webapps">
                     <exclude name="**/*.jsp"/>

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs Fri Jul 20 00:25:50 2012
@@ -30,6 +30,7 @@ function print_usage(){
   echo "  namenode -format     format the DFS filesystem"
   echo "  secondarynamenode    run the DFS secondary namenode"
   echo "  namenode             run the DFS namenode"
+  echo "  journalnode          run the DFS journalnode"
   echo "  zkfc                 run the ZK Failover Controller daemon"
   echo "  datanode             run a DFS datanode"
   echo "  dfsadmin             run a DFS admin client"
@@ -90,6 +91,9 @@ elif [ "$COMMAND" = "datanode" ] ; then
   else
     HADOOP_OPTS="$HADOOP_OPTS -server $HADOOP_DATANODE_OPTS"
   fi
+elif [ "$COMMAND" = "journalnode" ] ; then
+  CLASS='org.apache.hadoop.hdfs.qjournal.server.JournalNode'
+  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_JOURNALNODE_OPTS"
 elif [ "$COMMAND" = "dfs" ] ; then
   CLASS=org.apache.hadoop.fs.FsShell
   HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jul 20 00:25:50 2012
@@ -366,4 +366,37 @@ public class DFSConfigKeys extends Commo
   public static final boolean DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT = false;
   public static final String DFS_HA_ZKFC_PORT_KEY = "dfs.ha.zkfc.port";
   public static final int DFS_HA_ZKFC_PORT_DEFAULT = 8019;
+  
+  // Journal-node related configs. These are read on the JN side.
+  public static final String  DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";
+  public static final String  DFS_JOURNALNODE_EDITS_DIR_DEFAULT = "/tmp/hadoop/dfs/journalnode/";
+  public static final String  DFS_JOURNALNODE_RPC_ADDRESS_KEY = "dfs.journalnode.rpc-address";
+  public static final int     DFS_JOURNALNODE_RPC_PORT_DEFAULT = 8485;
+  public static final String  DFS_JOURNALNODE_RPC_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_RPC_PORT_DEFAULT;
+    
+  public static final String  DFS_JOURNALNODE_HTTP_ADDRESS_KEY = "dfs.journalnode.http-address";
+  public static final int     DFS_JOURNALNODE_HTTP_PORT_DEFAULT = 8480;
+  public static final String  DFS_JOURNALNODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_JOURNALNODE_HTTP_PORT_DEFAULT;
+
+  public static final String  DFS_JOURNALNODE_KEYTAB_FILE_KEY = "dfs.journalnode.keytab.file";
+  public static final String  DFS_JOURNALNODE_USER_NAME_KEY = "dfs.journalnode.kerberos.principal";
+  public static final String  DFS_JOURNALNODE_INTERNAL_SPNEGO_USER_NAME_KEY = "dfs.journalnode.kerberos.internal.spnego.principal";
+
+  // Journal-node related configs for the client side.
+  public static final String  DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = "dfs.qjournal.queued-edits.limit.mb";
+  public static final int     DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT = 10;
+  
+  // Quorum-journal timeouts for various operations. Unlikely to need
+  // to be tweaked, but configurable just in case.
+  public static final String  DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY = "dfs.qjournal.start-segment.timeout.ms";
+  public static final String  DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_KEY = "dfs.qjournal.prepare-recovery.timeout.ms";
+  public static final String  DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_KEY = "dfs.qjournal.accept-recovery.timeout.ms";
+  public static final String  DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_KEY = "dfs.qjournal.finalize-segment.timeout.ms";
+  public static final String  DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY = "dfs.qjournal.select-input-streams.timeout.ms";
+  public static final int     DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT = 20000;
+  public static final int     DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT = 20000;
+  public static final int     DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT = 60000;
+  public static final int     DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_DEFAULT = 20000;
+  public static final int     DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT = 20000;
 }
+

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Fri Jul 20 00:25:50 2012
@@ -328,12 +328,15 @@ public class PBHelper {
   }
 
   public static RemoteEditLogProto convert(RemoteEditLog log) {
-    return RemoteEditLogProto.newBuilder().setEndTxId(log.getEndTxId())
-        .setStartTxId(log.getStartTxId()).build();
+    return RemoteEditLogProto.newBuilder()
+        .setStartTxId(log.getStartTxId())
+        .setEndTxId(log.getEndTxId())
+        .setIsInProgress(log.isInProgress()).build();
   }
 
   public static RemoteEditLog convert(RemoteEditLogProto l) {
-    return new RemoteEditLog(l.getStartTxId(), l.getEndTxId());
+    return new RemoteEditLog(l.getStartTxId(), l.getEndTxId(),
+        l.getIsInProgress());
   }
 
   public static RemoteEditLogManifestProto convert(

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.net.URL;
+
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Interface for a remote log which is only communicated with asynchronously.
+ * This is essentially a wrapper around {@link QJournalProtocol} with the key
+ * differences being:
+ * 
+ * <ul>
+ * <li>All methods return {@link ListenableFuture}s instead of synchronous
+ * objects.</li>
+ * <li>The {@link RequestInfo} objects are created by the underlying
+ * implementation.</li>
+ * </ul>
+ */
+interface AsyncLogger {
+  
+  /**
+   * Send a batch of edits to the logger.
+   * @param firstTxnId the first txid of the edits.
+   * @param numTxns the number of transactions in the batch
+   * @param data the actual data to be sent
+   */
+  public ListenableFuture<Void> sendEdits(
+      final long firstTxnId, final int numTxns, final byte[] data);
+
+  /**
+   * Begin writing a new log segment.
+   * 
+   * @param txid the first txid to be written to the new log
+   */
+  public ListenableFuture<Void> startLogSegment(long txid);
+
+  /**
+   * Finalize a log segment.
+   * 
+   * @param startTxId the first txid that was written to the segment
+   * @param endTxId the last txid that was written to the segment
+   */
+  public ListenableFuture<Void> finalizeLogSegment(
+      long startTxId, long endTxId);
+
+  /**
+   * @return the state of the last epoch on the target node.
+   */
+  public ListenableFuture<GetJournalStateResponseProto> getJournalState();
+
+  /**
+   * Begin a new epoch on the target node.
+   */
+  public ListenableFuture<NewEpochResponseProto> newEpoch(long epoch);
+  
+  /**
+   * Fetch the list of edit logs available on the remote node.
+   */
+  public ListenableFuture<GetEditLogManifestResponseProto> getEditLogManifest(
+      long fromTxnId);
+
+  /**
+   * Prepare recovery. See the HDFS-3077 design document for details.
+   */
+  public ListenableFuture<PrepareRecoveryResponseProto> prepareRecovery(
+      long segmentTxId);
+
+  /**
+   * Accept a recovery proposal. See the HDFS-3077 design document for details.
+   */
+  public ListenableFuture<Void> acceptRecovery(SegmentStateProto log,
+      URL fromUrl);
+
+  /**
+   * Set the epoch number used for all future calls.
+   */
+  public void setEpoch(long e);
+
+  /**
+   * Build an HTTP URL to fetch the log segment with the given startTxId.
+   */
+  public URL buildURLToFetchLogs(long segmentTxId);
+  
+  /**
+   * Tear down any resources, connections, etc. The proxy may not be used
+   * after this point, and any in-flight RPCs may throw an exception.
+   */
+  public void close();
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Wrapper around a set of Loggers, taking care of fanning out
+ * calls to the underlying loggers and constructing corresponding
+ * {@link QuorumCall} instances.
+ */
+class AsyncLoggerSet {
+  static final Log LOG = LogFactory.getLog(AsyncLoggerSet.class);
+
+  private static final int NEWEPOCH_TIMEOUT_MS = 10000;
+  
+  private final List<AsyncLogger> loggers;
+  
+  private static final long INVALID_EPOCH = -1;
+  private long myEpoch = INVALID_EPOCH;
+  
+  public AsyncLoggerSet(List<AsyncLogger> loggers) {
+    this.loggers = ImmutableList.copyOf(loggers);
+  }
+  
+  /**
+   * Fence any previous writers, and obtain a unique epoch number
+   * for write-access to the journal nodes.
+   *
+   * @param nsInfo the expected namespace information. If the remote
+   * node does not match with this namespace, the request will be rejected.
+   * @return the new, unique epoch number
+   * @throws IOException
+   */
+  Map<AsyncLogger, NewEpochResponseProto> createNewUniqueEpoch(
+      NamespaceInfo nsInfo) throws IOException {
+    Preconditions.checkState(myEpoch == -1,
+        "epoch already created: epoch=" + myEpoch);
+    
+    Map<AsyncLogger, GetJournalStateResponseProto> lastPromises =
+      waitForWriteQuorum(getJournalState(), NEWEPOCH_TIMEOUT_MS);
+    
+    long maxPromised = Long.MIN_VALUE;
+    for (GetJournalStateResponseProto resp : lastPromises.values()) {
+      maxPromised = Math.max(maxPromised, resp.getLastPromisedEpoch());
+    }
+    assert maxPromised >= 0;
+    
+    long myEpoch = maxPromised + 1;
+    Map<AsyncLogger, NewEpochResponseProto> resps =
+        waitForWriteQuorum(newEpoch(nsInfo, myEpoch), NEWEPOCH_TIMEOUT_MS);
+    this.myEpoch = myEpoch;
+    setEpoch(myEpoch);
+    return resps;
+  }
+  
+  private void setEpoch(long e) {
+    for (AsyncLogger l : loggers) {
+      l.setEpoch(e);
+    }
+  }
+
+  /**
+   * @return the epoch number for this writer. This may only be called after
+   * a successful call to {@link #createNewUniqueEpoch(NamespaceInfo)}.
+   */
+  long getEpoch() {
+    Preconditions.checkState(myEpoch != INVALID_EPOCH,
+        "No epoch created yet");
+    return myEpoch;
+  }
+
+  /**
+   * Close all of the underlying loggers.
+   */
+  void close() {
+    for (AsyncLogger logger : loggers) {
+      logger.close();
+    }
+  }
+
+
+  /**
+   * Wait for a quorum of loggers to respond to the given call. If a quorum
+   * can't be achieved, throws a QuorumException.
+   * @param q the quorum call
+   * @param timeoutMs the number of millis to wait
+   * @return a map of successful results
+   * @throws QuorumException if a quorum doesn't respond with success
+   * @throws IOException if the thread is interrupted or times out
+   */
+  <V> Map<AsyncLogger, V> waitForWriteQuorum(QuorumCall<AsyncLogger, V> q,
+      int timeoutMs) throws IOException {
+    int majority = getMajoritySize();
+    try {
+      q.waitFor(
+          loggers.size(), // either all respond 
+          majority, // or we get a majority successes
+          majority, // or we get a majority failures,
+          timeoutMs);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IOException("Interrupted waiting for quorum results");
+    } catch (TimeoutException e) {
+      throw new IOException("Timed out waiting " + timeoutMs + " for write quorum");
+    }
+    
+    if (q.countSuccesses() < majority) {
+      q.rethrowException("Got too many exceptions to achieve quorum size " +
+          getMajorityString());
+    }
+    
+    return q.getResults();
+  }
+  
+  /**
+   * @return the number of nodes which are required to obtain a quorum.
+   */
+  int getMajoritySize() {
+    return loggers.size() / 2 + 1;
+  }
+  
+  /**
+   * @return a textual description of the majority size (eg "2/3" or "3/5")
+   */
+  String getMajorityString() {
+    return getMajoritySize() + "/" + loggers.size();
+  }
+
+  /**
+   * @return the number of loggers behind this set
+   */
+  int size() {
+    return loggers.size();
+  }
+
+  /**
+   * @return the (mutable) list of loggers, for use in tests to
+   * set up spies
+   */
+  @VisibleForTesting
+  List<AsyncLogger> getLoggersForTests() {
+    return loggers;
+  }
+  
+  ///////////////////////////////////////////////////////////////////////////
+  // The rest of this file is simply boilerplate wrappers which fan-out the
+  // various IPC calls to the underlying AsyncLoggers and wrap the result
+  // in a QuorumCall.
+  ///////////////////////////////////////////////////////////////////////////
+  
+  private QuorumCall<AsyncLogger, GetJournalStateResponseProto> getJournalState() {
+    Map<AsyncLogger, ListenableFuture<GetJournalStateResponseProto>> calls =
+        Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      calls.put(logger, logger.getJournalState());
+    }
+    return QuorumCall.create(calls);    
+  }
+
+  private QuorumCall<AsyncLogger,NewEpochResponseProto> newEpoch(
+      NamespaceInfo nsInfo,
+      long epoch) {
+    Map<AsyncLogger, ListenableFuture<NewEpochResponseProto>> calls =
+        Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      calls.put(logger, logger.newEpoch(epoch));
+    }
+    return QuorumCall.create(calls);    
+  }
+
+  public QuorumCall<AsyncLogger, Void> startLogSegment(
+      long txid) {
+    Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      calls.put(logger, logger.startLogSegment(txid));
+    }
+    return QuorumCall.create(calls);
+  }
+  
+  public QuorumCall<AsyncLogger, Void> finalizeLogSegment(long firstTxId,
+      long lastTxId) {
+    Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      calls.put(logger, logger.finalizeLogSegment(firstTxId, lastTxId));
+    }
+    return QuorumCall.create(calls);
+  }
+  
+  public QuorumCall<AsyncLogger, Void> sendEdits(
+      long firstTxnId, int numTxns, byte[] data) {
+    Map<AsyncLogger, ListenableFuture<Void>> calls = Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      ListenableFuture<Void> future = 
+        logger.sendEdits(firstTxnId, numTxns, data);
+      calls.put(logger, future);
+    }
+    return QuorumCall.create(calls);
+  }
+
+  public QuorumCall<AsyncLogger,GetEditLogManifestResponseProto>
+      getEditLogManifest(long fromTxnId) {
+    Map<AsyncLogger,
+        ListenableFuture<GetEditLogManifestResponseProto>> calls
+        = Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      ListenableFuture<GetEditLogManifestResponseProto> future =
+          logger.getEditLogManifest(fromTxnId);
+      calls.put(logger, future);
+    }
+    return QuorumCall.create(calls);
+  }
+
+  QuorumCall<AsyncLogger, PrepareRecoveryResponseProto>
+      prepareRecovery(long segmentTxId) {
+    Map<AsyncLogger,
+      ListenableFuture<PrepareRecoveryResponseProto>> calls
+      = Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      ListenableFuture<PrepareRecoveryResponseProto> future =
+          logger.prepareRecovery(segmentTxId);
+      calls.put(logger, future);
+    }
+    return QuorumCall.create(calls);
+  }
+
+  QuorumCall<AsyncLogger,Void>
+      acceptRecovery(SegmentStateProto log, URL fromURL) {
+    Map<AsyncLogger, ListenableFuture<Void>> calls
+      = Maps.newHashMap();
+    for (AsyncLogger logger : loggers) {
+      ListenableFuture<Void> future =
+          logger.acceptRecovery(log, fromURL);
+      calls.put(logger, future);
+    }
+    return QuorumCall.create(calls);
+  }
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
+import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.qjournal.server.GetJournalEditServlet;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.common.util.concurrent.UncaughtExceptionHandlers;
+
+/**
+ * Channel to a remote JournalNode using Hadoop IPC.
+ * All of the calls are run on a separate thread, and return
+ * {@link ListenableFuture} instances to wait for their result.
+ * This allows calls to be bound together using the {@link QuorumCall}
+ * class.
+ */
+@InterfaceAudience.Private
+public class IPCLoggerChannel implements AsyncLogger {
+
+  private final Configuration conf;
+  private final InetSocketAddress addr;
+  private QJournalProtocol proxy;
+
+  private final ListeningExecutorService executor;
+  private long ipcSerial = 0;
+  private long epoch = -1;
+  private final String journalId;
+  private final NamespaceInfo nsInfo;
+  private int httpPort = -1;
+  
+  /**
+   * The number of bytes of edits data still in the queue.
+   */
+  private int queuedEditsSizeBytes = 0;
+
+  /**
+   * The maximum number of bytes that can be pending in the queue.
+   * This keeps the writer from hitting OOME if one of the loggers
+   * starts responding really slowly. Eventually, the queue
+   * overflows and it starts to treat the logger as having errored.
+   */
+  private final int queueSizeLimitBytes;
+  
+  public IPCLoggerChannel(Configuration conf,
+      NamespaceInfo nsInfo,
+      String journalId,
+      InetSocketAddress addr) {
+    this.conf = conf;
+    this.nsInfo = nsInfo;
+    this.journalId = journalId;
+    this.addr = addr;
+    
+    this.queueSizeLimitBytes = 1024 * 1024 * conf.getInt(
+        DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY,
+        DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT);
+    
+    executor = MoreExecutors.listeningDecorator(
+        Executors.newSingleThreadExecutor(
+          new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat("Logger channel to " + addr)
+            .setUncaughtExceptionHandler(
+                UncaughtExceptionHandlers.systemExit())
+            .build()));
+  }
+  
+  @Override
+  public synchronized void setEpoch(long epoch) {
+    this.epoch = epoch;
+  }
+  
+  @Override
+  public void close() {
+    // No more tasks may be submitted after this point.
+    executor.shutdown();
+    if (proxy != null) {
+      // TODO: this can hang for quite some time if the client
+      // is currently in the middle of a call to a downed JN.
+      // We should instead do this asynchronously, and just stop
+      // making any more calls after this point (eg clear the queue)
+      RPC.stopProxy(proxy);
+    }
+  }
+  
+  protected QJournalProtocol getProxy() throws IOException {
+    if (proxy != null) return proxy;
+
+    RPC.setProtocolEngine(conf,
+        QJournalProtocolPB.class, ProtobufRpcEngine.class);
+    QJournalProtocolPB pbproxy = RPC.getProxy(
+        QJournalProtocolPB.class,
+        RPC.getProtocolVersion(QJournalProtocolPB.class),
+        addr, conf);
+    proxy = new QJournalProtocolTranslatorPB(pbproxy);
+    return proxy;
+  }
+  
+  @Override
+  public URL buildURLToFetchLogs(long segmentTxId) {
+    Preconditions.checkArgument(segmentTxId > 0,
+        "Invalid segment: %s", segmentTxId);
+    Preconditions.checkState(httpPort != -1,
+        "HTTP port not set yet");
+        
+    try {
+      String path = GetJournalEditServlet.buildPath(
+          journalId, segmentTxId, nsInfo);
+      return new URL("http", addr.getHostName(), httpPort, path.toString());
+    } catch (MalformedURLException e) {
+      // should never get here.
+      throw new RuntimeException(e);
+    }
+  }
+
+  private synchronized RequestInfo createReqInfo() {
+    Preconditions.checkState(epoch > 0, "bad epoch: " + epoch);
+    return new RequestInfo(journalId, epoch, ipcSerial++);
+  }
+
+  @VisibleForTesting
+  synchronized long getNextIpcSerial() {
+    return ipcSerial;
+  }
+
+  public synchronized int getQueuedEditsSize() {
+    return queuedEditsSizeBytes;
+  }
+  
+  @VisibleForTesting
+  void waitForAllPendingCalls() throws InterruptedException {
+    try {
+      executor.submit(new Runnable() {
+        @Override
+        public void run() {
+        }
+      }).get();
+    } catch (ExecutionException e) {
+      // This can't happen!
+      throw new AssertionError(e);
+    }
+  }
+
+  @Override
+  public ListenableFuture<GetJournalStateResponseProto> getJournalState() {
+    return executor.submit(new Callable<GetJournalStateResponseProto>() {
+      @Override
+      public GetJournalStateResponseProto call() throws IOException {
+        GetJournalStateResponseProto ret =
+            getProxy().getJournalState(journalId);
+        httpPort = ret.getHttpPort();
+        return ret;
+      }
+    });
+  }
+
+  @Override
+  public ListenableFuture<NewEpochResponseProto> newEpoch(
+      final long epoch) {
+    return executor.submit(new Callable<NewEpochResponseProto>() {
+      @Override
+      public NewEpochResponseProto call() throws IOException {
+        return getProxy().newEpoch(journalId, nsInfo, epoch);
+      }
+    });
+  }
+  
+  @Override
+  public ListenableFuture<Void> sendEdits(
+      final long firstTxnId, final int numTxns, final byte[] data) {
+    try {
+      reserveQueueSpace(data.length);
+    } catch (LoggerTooFarBehindException e) {
+      return Futures.immediateFailedFuture(e);
+    }
+    ListenableFuture<Void> ret = null;
+    try {
+      ret = executor.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws IOException {
+          getProxy().journal(createReqInfo(), firstTxnId, numTxns, data);
+          return null;
+        }
+      });
+    } finally {
+      if (ret == null) {
+        // it didn't successfully get submitted,
+        // so adjust the queue size back down.
+        unreserveQueueSpace(data.length);
+      } else {
+        // It was submitted to the queue, so adjust the length
+        // once the call completes, regardless of whether it
+        // succeeds or fails.
+        Futures.addCallback(ret, new FutureCallback<Void>() {
+          @Override
+          public void onFailure(Throwable t) {
+            unreserveQueueSpace(data.length);
+          }
+
+          @Override
+          public void onSuccess(Void t) {
+            unreserveQueueSpace(data.length);
+          }
+        });
+      }
+    }
+    return ret;
+  }
+
+  private synchronized void reserveQueueSpace(int size)
+      throws LoggerTooFarBehindException {
+    Preconditions.checkArgument(size >= 0);
+    if (queuedEditsSizeBytes + size > queueSizeLimitBytes &&
+        queuedEditsSizeBytes > 0) {
+      throw new LoggerTooFarBehindException();
+    }
+    queuedEditsSizeBytes += size;
+  }
+  
+  private synchronized void unreserveQueueSpace(int size) {
+    Preconditions.checkArgument(size >= 0);
+    queuedEditsSizeBytes -= size;
+  }
+
+  @Override
+  public ListenableFuture<Void> startLogSegment(final long txid) {
+    return executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws IOException {
+        getProxy().startLogSegment(createReqInfo(), txid);
+        return null;
+      }
+    });
+  }
+  
+  @Override
+  public ListenableFuture<Void> finalizeLogSegment(
+      final long startTxId, final long endTxId) {
+    return executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws IOException {
+        getProxy().finalizeLogSegment(createReqInfo(), startTxId, endTxId);
+        return null;
+      }
+    });
+  }
+  
+  @Override
+  public ListenableFuture<GetEditLogManifestResponseProto> getEditLogManifest(
+      final long fromTxnId) {
+    return executor.submit(new Callable<GetEditLogManifestResponseProto>() {
+      @Override
+      public GetEditLogManifestResponseProto call() throws IOException {
+        return getProxy().getEditLogManifest(journalId, fromTxnId);
+      }
+    });
+  }
+
+  @Override
+  public ListenableFuture<PrepareRecoveryResponseProto> prepareRecovery(
+      final long segmentTxId) {
+    return executor.submit(new Callable<PrepareRecoveryResponseProto>() {
+      @Override
+      public PrepareRecoveryResponseProto call() throws IOException {
+        return getProxy().prepareRecovery(createReqInfo(), segmentTxId);
+      }
+    });
+  }
+
+  @Override
+  public ListenableFuture<Void> acceptRecovery(
+      final SegmentStateProto log, final URL url) {
+    return executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws IOException {
+        getProxy().acceptRecovery(createReqInfo(), log, url);
+        return null;
+      }
+    });
+  }
+
+  @Override
+  public String toString() {
+    return "Channel to journal node " + addr; 
+  }
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/LoggerTooFarBehindException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/LoggerTooFarBehindException.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/LoggerTooFarBehindException.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/LoggerTooFarBehindException.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.io.IOException;
+
+class LoggerTooFarBehindException extends IOException {
+  private static final long serialVersionUID = 1L;
+
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumCall.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.util.Time;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.Message;
+import com.google.protobuf.TextFormat;
+
+/**
+ * Represents a set of calls for which a quorum of results is needed.
+ * @param <KEY> a key used to identify each of the outgoing calls
+ * @param <RESULT> the type of the call result
+ */
+class QuorumCall<KEY, RESULT> {
+  private final Map<KEY, RESULT> successes = Maps.newHashMap();
+  private final Map<KEY, Throwable> exceptions = Maps.newHashMap();
+  
+  static <KEY, RESULT> QuorumCall<KEY, RESULT> create(
+      Map<KEY, ? extends ListenableFuture<RESULT>> calls) {
+    final QuorumCall<KEY, RESULT> qr = new QuorumCall<KEY, RESULT>();
+    for (final Entry<KEY, ? extends ListenableFuture<RESULT>> e : calls.entrySet()) {
+      Preconditions.checkArgument(e.getValue() != null,
+          "null future for key: " + e.getKey());
+      Futures.addCallback(e.getValue(), new FutureCallback<RESULT>() {
+        @Override
+        public void onFailure(Throwable t) {
+          qr.addException(e.getKey(), t);
+        }
+
+        @Override
+        public void onSuccess(RESULT res) {
+          qr.addResult(e.getKey(), res);
+        }
+      });
+    }
+    return qr;
+  }
+  
+  private QuorumCall() {
+    // Only instantiated from factory method above
+  }
+  
+  /**
+   * Wait for the quorum to achieve a certain number of responses.
+   * 
+   * Note that, even after this returns, more responses may arrive,
+   * causing the return value of other methods in this class to change.
+   *
+   * @param minResponses return as soon as this many responses have been
+   * received, regardless of whether they are successes or exceptions
+   * @param minSuccesses return as soon as this many successful (non-exception)
+   * responses have been received
+   * @param maxExceptions return as soon as this many exception responses
+   * have been received. Pass 0 to return immediately if any exception is
+   * received.
+   * @param millis the number of milliseconds to wait for
+   * @throws InterruptedException if the thread is interrupted while waiting
+   * @throws TimeoutException if the specified timeout elapses before
+   * achieving the desired conditions
+   */
+  public synchronized void waitFor(
+      int minResponses, int minSuccesses, int maxExceptions,
+      int millis)
+      throws InterruptedException, TimeoutException {
+    long et = Time.monotonicNow() + millis;
+    while (true) {
+      if (minResponses > 0 && countResponses() >= minResponses) return;
+      if (minSuccesses > 0 && countSuccesses() >= minSuccesses) return;
+      if (maxExceptions >= 0 && countExceptions() > maxExceptions) return;
+      long rem = et - Time.monotonicNow();
+      if (rem <= 0) {
+        throw new TimeoutException();
+      }
+      wait(rem);
+    }
+  }
+
+  private synchronized void addResult(KEY k, RESULT res) {
+    successes.put(k, res);
+    notifyAll();
+  }
+  
+  private synchronized void addException(KEY k, Throwable t) {
+    exceptions.put(k, t);
+    notifyAll();
+  }
+  
+  /**
+   * @return the total number of calls for which a response has been received,
+   * regardless of whether it threw an exception or returned a successful
+   * result.
+   */
+  public synchronized int countResponses() {
+    return successes.size() + exceptions.size();
+  }
+  
+  /**
+   * @return the number of calls for which a non-exception response has been
+   * received.
+   */
+  public synchronized int countSuccesses() {
+    return successes.size();
+  }
+  
+  /**
+   * @return the number of calls for which an exception response has been
+   * received.
+   */
+  public synchronized int countExceptions() {
+    return exceptions.size();
+  }
+
+  /**
+   * @return the map of successful responses. A copy is made such that this
+   * map will not be further mutated, even if further results arrive for the
+   * quorum.
+   */
+  public synchronized Map<KEY, RESULT> getResults() {
+    return Maps.newHashMap(successes);
+  }
+
+  public synchronized void rethrowException(String msg) throws QuorumException {
+    Preconditions.checkState(!exceptions.isEmpty());
+    throw QuorumException.create(msg, successes, exceptions);
+  }
+
+  public static <K> String mapToString(
+      Map<K, ? extends Message> map) {
+    StringBuilder sb = new StringBuilder();
+    boolean first = true;
+    for (Map.Entry<K, ? extends Message> e : map.entrySet()) {
+      if (!first) {
+        sb.append("\n");
+      }
+      first = false;
+      sb.append(e.getKey()).append(": ")
+        .append(TextFormat.shortDebugString(e.getValue()));
+    }
+    return sb.toString();
+  }
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumException.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumException.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumException.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+
+/**
+ * Exception thrown when too many exceptions occur while gathering
+ * responses to a quorum call. 
+ */
+class QuorumException extends IOException {
+
+  /**
+   * Create a QuorumException instance with a descriptive message detailing
+   * the underlying exceptions, as well as any successful responses which
+   * were returned.
+   * @param <K> the keys for the quorum calls
+   * @param <V> the success response type
+   * @param successes any successful responses returned
+   * @param exceptions the exceptions returned
+   */
+  public static <K, V> QuorumException create(
+      String simpleMsg,
+      Map<K, V> successes,
+      Map<K, Throwable> exceptions) {
+    Preconditions.checkArgument(!exceptions.isEmpty(),
+        "Must pass exceptions");
+    
+    StringBuilder msg = new StringBuilder();
+    msg.append(simpleMsg).append(". ");
+    if (!successes.isEmpty()) {
+      msg.append(successes.size()).append(" successful responses:\n");
+      Joiner.on("\n")
+          .useForNull("null")
+          .withKeyValueSeparator(": ")
+          .appendTo(msg, successes);
+      msg.append("\n");
+    }
+    msg.append(exceptions.size() + " exceptions thrown:\n");
+    boolean isFirst = true;
+    
+    for (Map.Entry<K, Throwable> e : exceptions.entrySet()) {
+      if (!isFirst) {
+        msg.append("\n");
+      }
+      isFirst = false;
+      
+      msg.append(e.getKey()).append(": ");
+      
+      if (e.getValue() instanceof RuntimeException) {
+        msg.append(StringUtils.stringifyException(e.getValue()));
+      } else {
+        msg.append(e.getValue().getLocalizedMessage());
+      }
+    }
+    return new QuorumException(msg.toString());
+  }
+
+  private QuorumException(String msg) {
+    super(msg);
+  }
+
+  private static final long serialVersionUID = 1L;
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.PriorityQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.JournalManager;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.Lists;
+
+/**
+ * A JournalManager that writes to a set of remote JournalNodes,
+ * requiring a quorum of nodes to ack each write.
+ */
+@InterfaceAudience.Private
+public class QuorumJournalManager implements JournalManager {
+  static final Log LOG = LogFactory.getLog(QuorumJournalManager.class);
+
+  // Timeouts for which the QJM will wait for each of the following actions.
+  private final int startSegmentTimeoutMs;
+  private final int prepareRecoveryTimeoutMs;
+  private final int acceptRecoveryTimeoutMs;
+  private final int finalizeSegmentTimeoutMs;
+  private final int selectInputStreamsTimeoutMs;
+  
+  private final Configuration conf;
+  private final URI uri;
+  private final NamespaceInfo nsInfo;
+  private boolean isActiveWriter;
+  
+  private final AsyncLoggerSet loggers;
+  
+  public QuorumJournalManager(Configuration conf,
+      URI uri, NamespaceInfo nsInfo) throws IOException {
+    Preconditions.checkArgument(conf != null, "must be configured");
+
+    this.conf = conf;
+    this.uri = uri;
+    this.nsInfo = nsInfo;
+    this.loggers = new AsyncLoggerSet(createLoggers());
+
+    // Configure timeouts.
+    this.startSegmentTimeoutMs = conf.getInt(
+        DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_QJOURNAL_START_SEGMENT_TIMEOUT_DEFAULT);
+    this.prepareRecoveryTimeoutMs = conf.getInt(
+        DFSConfigKeys.DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_QJOURNAL_PREPARE_RECOVERY_TIMEOUT_DEFAULT);
+    this.acceptRecoveryTimeoutMs = conf.getInt(
+        DFSConfigKeys.DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_QJOURNAL_ACCEPT_RECOVERY_TIMEOUT_DEFAULT);
+    this.finalizeSegmentTimeoutMs = conf.getInt(
+        DFSConfigKeys.DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_QJOURNAL_FINALIZE_SEGMENT_TIMEOUT_DEFAULT);
+    this.selectInputStreamsTimeoutMs = conf.getInt(
+        DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_KEY,
+        DFSConfigKeys.DFS_QJOURNAL_SELECT_INPUT_STREAMS_TIMEOUT_DEFAULT);
+        
+  }
+  
+  static String parseJournalId(URI uri) {
+    String path = uri.getPath();
+    Preconditions.checkArgument(path != null && !path.isEmpty(),
+        "Bad URI '%s': must identify journal in path component",
+        uri);
+    String journalId = path.substring(1);
+    checkJournalId(journalId);
+    return journalId;
+  }
+  
+  public static void checkJournalId(String jid) {
+    Preconditions.checkArgument(jid != null &&
+        !jid.isEmpty() &&
+        !jid.contains("/") &&
+        !jid.startsWith("."),
+        "bad journal id: " + jid);
+  }
+
+  /**
+   * Run recovery/synchronization for a specific segment.
+   * Postconditions:
+   * <ul>
+   * <li>This segment will be finalized on a majority
+   * of nodes.</li>
+   * <li>All nodes which contain the finalized segment will
+   * agree on the length.</li>
+   * </ul>
+   * 
+   * @param segmentTxId the starting txid of the segment
+   * @throws IOException
+   */
+  private void recoverUnclosedSegment(long segmentTxId) throws IOException {
+    Preconditions.checkArgument(segmentTxId > 0);
+    LOG.info("Beginning recovery of unclosed segment starting at txid " +
+        segmentTxId);
+    
+    // Step 1. Prepare recovery
+    QuorumCall<AsyncLogger,PrepareRecoveryResponseProto> prepare =
+        loggers.prepareRecovery(segmentTxId);
+    Map<AsyncLogger, PrepareRecoveryResponseProto> prepareResponses=
+        loggers.waitForWriteQuorum(prepare, prepareRecoveryTimeoutMs);
+    LOG.info("Recovery prepare phase complete. Responses: " +
+        QuorumCall.mapToString(prepareResponses));
+
+    // Determine the logger who either:
+    // a) Has already accepted a previous proposal that's higher than any
+    //    other
+    //
+    //  OR, if no such logger exists:
+    //
+    // b) Has the longest log starting at this transaction ID
+    
+    // TODO: we should collect any "ties" and pass the URL for all of them
+    // when syncing, so we can tolerate failure during recovery better.
+    Entry<AsyncLogger, PrepareRecoveryResponseProto> bestEntry = Collections.max(
+        prepareResponses.entrySet(), RECOVERY_COMPARATOR); 
+    AsyncLogger bestLogger = bestEntry.getKey();
+    PrepareRecoveryResponseProto bestResponse = bestEntry.getValue();
+    
+    // Log the above decision, check invariants.
+    if (bestResponse.hasAcceptedInEpoch()) {
+      LOG.info("Using already-accepted recovery for segment " +
+          "starting at txid " + segmentTxId + ": " +
+          bestEntry);
+    } else if (bestResponse.hasSegmentState()) {
+      LOG.info("Using longest log: " + bestEntry);
+    } else {
+      // TODO: can we get here? what about the following case:
+      // - 3 JNs, JN1, JN2, JN3
+      // - writer starts segment 101 on JN1, then crashes
+      // - during newEpoch(), we saw the segment on JN1 and decide to recover segment 101
+      // - during prepare(), JN1 has actually crashed, and we only talk to JN2 and JN3,
+      //   neither of which has any entry for this log.
+      // Write a test case.
+      throw new AssertionError("None of the responses " +
+          "had a log to recover: " + QuorumCall.mapToString(prepareResponses));
+    }
+    
+    
+    // TODO: check that md5s match up between any "tied" logs
+    
+    SegmentStateProto logToSync = bestResponse.getSegmentState();
+    assert segmentTxId == logToSync.getStartTxId();
+    
+    URL syncFromUrl = bestLogger.buildURLToFetchLogs(segmentTxId);
+    
+    QuorumCall<AsyncLogger,Void> accept = loggers.acceptRecovery(logToSync, syncFromUrl);
+    loggers.waitForWriteQuorum(accept, acceptRecoveryTimeoutMs);
+    
+    // TODO:
+    // we should only try to finalize loggers who successfully synced above
+    // eg if a logger was down, we don't want to send the finalize request.
+    // write a test for this!
+    
+    QuorumCall<AsyncLogger, Void> finalize =
+        loggers.finalizeLogSegment(logToSync.getStartTxId(), logToSync.getEndTxId()); 
+    loggers.waitForWriteQuorum(finalize, finalizeSegmentTimeoutMs);
+  }
+  
+  private static final Comparator<Entry<AsyncLogger, PrepareRecoveryResponseProto>> RECOVERY_COMPARATOR =
+  new Comparator<Entry<AsyncLogger, PrepareRecoveryResponseProto>>() {
+      @Override
+      public int compare(
+          Entry<AsyncLogger, PrepareRecoveryResponseProto> a,
+          Entry<AsyncLogger, PrepareRecoveryResponseProto> b) {
+        
+        PrepareRecoveryResponseProto r1 = a.getValue();
+        PrepareRecoveryResponseProto r2 = b.getValue();
+        
+        if (r1.hasSegmentState() && r2.hasSegmentState()) {
+          assert r1.getSegmentState().getStartTxId() ==
+              r2.getSegmentState().getStartTxId() : "bad args: " + r1 + ", " + r2;
+        }
+        
+        return ComparisonChain.start()
+            // If one of them has accepted something and the other hasn't,
+            // use the one with an accepted recovery
+            .compare(r1.hasAcceptedInEpoch(), r2.hasAcceptedInEpoch())
+            // If they both accepted, use the one that's more recent
+            .compare(r1.getAcceptedInEpoch(),
+                     r2.getAcceptedInEpoch())
+            // Otherwise, choose based on which log is longer
+            .compare(r1.hasSegmentState(), r2.hasSegmentState())
+            .compare(r1.getSegmentState().getEndTxId(), r2.getSegmentState().getEndTxId())
+            .result();
+      }
+  };
+
+  protected List<AsyncLogger> createLoggers() throws IOException {
+    return createLoggers(conf, uri, nsInfo);
+  }
+  
+  static List<AsyncLogger> createLoggers(Configuration conf,
+      URI uri, NamespaceInfo nsInfo) throws IOException {
+    List<AsyncLogger> ret = Lists.newArrayList();
+    List<InetSocketAddress> addrs = getLoggerAddresses(uri);
+    String jid = parseJournalId(uri);
+    for (InetSocketAddress addr : addrs) {
+      ret.add(new IPCLoggerChannel(conf, nsInfo, jid, addr));
+    }
+    return ret;
+  }
+ 
+  private static List<InetSocketAddress> getLoggerAddresses(URI uri)
+      throws IOException {
+    String authority = uri.getAuthority();
+    Preconditions.checkArgument(authority != null && !authority.isEmpty(),
+        "URI has no authority: " + uri);
+    
+    String[] parts = StringUtils.split(authority, ';');
+    for (int i = 0; i < parts.length; i++) {
+      parts[i] = parts[i].trim();
+    }
+
+    if (parts.length % 2 == 0) {
+      LOG.warn("Quorum journal URI '" + uri + "' has an even number " +
+          "of Journal Nodes specified. This is not recommended!");
+    }
+    
+    List<InetSocketAddress> addrs = Lists.newArrayList();
+    for (String addr : parts) {
+      addrs.add(NetUtils.createSocketAddr(
+          addr, DFSConfigKeys.DFS_JOURNALNODE_RPC_PORT_DEFAULT));
+    }
+    return addrs;
+  }
+
+  @Override
+  public EditLogOutputStream startLogSegment(long txId) throws IOException {
+    Preconditions.checkState(isActiveWriter,
+        "must recover segments before starting a new one");
+    QuorumCall<AsyncLogger,Void> q = loggers.startLogSegment(txId);
+    loggers.waitForWriteQuorum(q, startSegmentTimeoutMs);
+    return new QuorumOutputStream(loggers);
+  }
+
+  @Override
+  public void finalizeLogSegment(long firstTxId, long lastTxId)
+      throws IOException {
+    QuorumCall<AsyncLogger,Void> q = loggers.finalizeLogSegment(
+        firstTxId, lastTxId);
+    loggers.waitForWriteQuorum(q, finalizeSegmentTimeoutMs);
+  }
+
+  @Override
+  public void setOutputBufferCapacity(int size) {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException {
+    // TODO Auto-generated method stub
+  }
+
+  @Override
+  public void recoverUnfinalizedSegments() throws IOException {
+    Preconditions.checkState(!isActiveWriter, "already active writer");
+    
+    Map<AsyncLogger, NewEpochResponseProto> resps =
+        loggers.createNewUniqueEpoch(nsInfo);
+    LOG.info("newEpoch(" + loggers.getEpoch() + ") responses:\n" +
+        QuorumCall.mapToString(resps));
+    
+    long mostRecentSegmentTxId = Long.MIN_VALUE;
+    for (NewEpochResponseProto r : resps.values()) {
+      if (r.hasLastSegmentTxId()) {
+        mostRecentSegmentTxId = Math.max(mostRecentSegmentTxId,
+            r.getLastSegmentTxId());
+      }
+    }
+    
+    // On a completely fresh system, none of the journals have any
+    // segments, so there's nothing to recover.
+    if (mostRecentSegmentTxId != Long.MIN_VALUE) {
+      recoverUnclosedSegment(mostRecentSegmentTxId);
+    }
+    isActiveWriter = true;
+  }
+
+  @Override
+  public void close() throws IOException {
+    loggers.close();
+  }
+
+  @Override
+  public void selectInputStreams(Collection<EditLogInputStream> streams,
+      long fromTxnId, boolean inProgressOk) {
+
+    QuorumCall<AsyncLogger,GetEditLogManifestResponseProto> q =
+        loggers.getEditLogManifest(fromTxnId);
+    Map<AsyncLogger, GetEditLogManifestResponseProto> resps;
+    try {
+      resps = loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs);
+    } catch (IOException ioe) {
+      // TODO: can we do better here?
+      throw new RuntimeException(ioe);
+    }
+    
+    LOG.info("selectInputStream manifests:\n" +
+        QuorumCall.mapToString(resps));
+    
+    final PriorityQueue<EditLogInputStream> allStreams = 
+        new PriorityQueue<EditLogInputStream>(64,
+            JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
+    for (Map.Entry<AsyncLogger, GetEditLogManifestResponseProto> e : resps.entrySet()) {
+      AsyncLogger logger = e.getKey();
+      GetEditLogManifestResponseProto response = e.getValue();
+      RemoteEditLogManifest manifest = PBHelper.convert(response.getManifest());
+      
+      for (RemoteEditLog remoteLog : manifest.getLogs()) {
+        URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId());
+        LOG.info("URL: " + url);
+
+        EditLogInputStream elis = EditLogFileInputStream.fromUrl(
+            url, remoteLog.getStartTxId(), remoteLog.getEndTxId(),
+            remoteLog.isInProgress());
+        allStreams.add(elis);
+      }
+    }
+    JournalSet.chainAndMakeRedundantStreams(
+        streams, allStreams, fromTxnId, inProgressOk);
+  }
+  
+  @Override
+  public String toString() {
+    return "Quorum journal manager " + uri;
+  }
+
+  @VisibleForTesting
+  AsyncLoggerSet getLoggerSetForTests() {
+    return loggers;
+  }
+
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.EditsDoubleBuffer;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.io.DataOutputBuffer;
+
+/**
+ * EditLogOutputStream implementation that writes to a quorum of
+ * remote journals.
+ */
+class QuorumOutputStream extends EditLogOutputStream {
+  private final AsyncLoggerSet loggers;
+  private EditsDoubleBuffer buf;
+
+  public QuorumOutputStream(AsyncLoggerSet loggers) throws IOException {
+    super();
+    this.buf = new EditsDoubleBuffer(256*1024); // TODO: conf
+    this.loggers = loggers;
+  }
+
+  @Override
+  public void write(FSEditLogOp op) throws IOException {
+    buf.writeOp(op);
+  }
+
+  @Override
+  public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
+    buf.writeRaw(bytes, offset, length);
+  }
+
+  @Override
+  public void create() throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (buf != null) {
+      buf.close();
+      buf = null;
+    }
+  }
+
+  @Override
+  public void abort() throws IOException {
+    QuorumJournalManager.LOG.warn("Aborting " + this);
+    buf = null;
+    close();
+  }
+
+  @Override
+  public void setReadyToFlush() throws IOException {
+    buf.setReadyToFlush();
+  }
+
+  @Override
+  protected void flushAndSync() throws IOException {
+    int numReadyBytes = buf.countReadyBytes();
+    if (numReadyBytes > 0) {
+      int numReadyTxns = buf.countReadyTxns();
+      long firstTxToFlush = buf.getFirstReadyTxId();
+
+      assert numReadyTxns > 0;
+
+      // Copy from our double-buffer into a new byte array. This is for
+      // two reasons:
+      // 1) The IPC code has no way of specifying to send only a slice of
+      //    a larger array.
+      // 2) because the calls to the underlying nodes are asynchronous, we
+      //    need a defensive copy to avoid accidentally mutating the buffer
+      //    before it is sent.
+      DataOutputBuffer bufToSend = new DataOutputBuffer(numReadyBytes);
+      buf.flushTo(bufToSend);
+      assert bufToSend.getLength() == numReadyBytes;
+      byte[] data = bufToSend.getData();
+      assert data.length == bufToSend.getLength();
+
+      QuorumCall<AsyncLogger, Void> qcall = loggers.sendEdits(
+          firstTxToFlush, numReadyTxns, data);
+      loggers.waitForWriteQuorum(qcall, 20000); // TODO: configurable timeout
+    }
+  }
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocol;
+
+import java.io.IOException;
+import java.net.URL;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol used to communicate between {@link QuorumJournalManager}
+ * and each {@link JournalNode}.
+ * 
+ * This is responsible for sending edits as well as coordinating
+ * recovery of the nodes.
+ */
+@KerberosInfo(
+    serverPrincipal = DFSConfigKeys.DFS_JOURNALNODE_USER_NAME_KEY,
+    clientPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY)
+@InterfaceAudience.Private
+public interface QJournalProtocol {
+  public static final long versionID = 1L;
+
+  /**
+   * Get the current state of the journal, including the most recent
+   * epoch number and the HTTP port.
+   */
+  public GetJournalStateResponseProto getJournalState(String journalId)
+      throws IOException;
+  
+  /**
+   * Begin a new epoch. See the HDFS-3077 design doc for details.
+   */
+  public NewEpochResponseProto newEpoch(String journalId,
+      NamespaceInfo nsInfo, long epoch) throws IOException;
+  
+  /**
+   * Journal edit records.
+   * This message is sent by the active name-node to the JournalNodes
+   * to write edits to their local logs.
+   */
+  public void journal(RequestInfo reqInfo,
+                      long firstTxnId,
+                      int numTxns,
+                      byte[] records) throws IOException;
+
+  /**
+   * Start writing to a new log segment on the JournalNode.
+   * Before calling this, one should finalize the previous segment
+   * using {@link #finalizeLogSegment(RequestInfo, long, long)}.
+   * 
+   * @param txid the first txid in the new log
+   */
+  public void startLogSegment(RequestInfo reqInfo,
+      long txid) throws IOException;
+
+  /**
+   * Finalize the given log segment on the JournalNode. The segment
+   * is expected to be in-progress and starting at the given startTxId.
+   *
+   * @param startTxId the starting transaction ID of teh log
+   * @param endTxId the expected last transaction in the given log
+   * @throws IOException if no such segment exists
+   */
+  public void finalizeLogSegment(RequestInfo reqInfo,
+      long startTxId, long endTxId) throws IOException;
+
+  /**
+   * @param jid the journal from which to enumerate edits
+   * @param sinceTxId the first transaction which the client cares about
+   * @return a list of edit log segments since the given transaction ID.
+   */
+  public GetEditLogManifestResponseProto getEditLogManifest(
+      String jid, long sinceTxId) throws IOException;
+  
+  /**
+   * Begin the recovery process for a given segment. See the HDFS-3077
+   * design document for details.
+   */
+  public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
+      long segmentTxId) throws IOException;
+
+  /**
+   * Accept a proposed recovery for the given transaction ID.
+   */
+  public void acceptRecovery(RequestInfo reqInfo,
+      SegmentStateProto stateToAccept, URL fromUrl) throws IOException;
+
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class RequestInfo {
+  private String jid;
+  private long epoch;
+  private long ipcSerialNumber;
+  
+  public RequestInfo(String jid, long epoch, long ipcSerialNumber) {
+    this.jid = jid;
+    this.epoch = epoch;
+    this.ipcSerialNumber = ipcSerialNumber;
+  }
+
+  public long getEpoch() {
+    return epoch;
+  }
+
+  public void setEpoch(long epoch) {
+    this.epoch = epoch;
+  }
+  
+  public String getJournalId() {
+    return jid;
+  }
+
+  public long getIpcSerialNumber() {
+    return ipcSerialNumber;
+  }
+
+  public void setIpcSerialNumber(long ipcSerialNumber) {
+    this.ipcSerialNumber = ipcSerialNumber;
+  }
+
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolPB.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolPB.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolPB.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.protocolPB;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.QJournalProtocolService;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+/**
+ * Protocol used to journal edits to a JournalNode participating
+ * in the quorum journal.
+ * Note: This extends the protocolbuffer service based interface to
+ * add annotations required for security.
+ */
+@KerberosInfo(
+    serverPrincipal = DFSConfigKeys.DFS_JOURNALNODE_USER_NAME_KEY,
+    clientPrincipal = DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY)
+@ProtocolInfo(protocolName = 
+    "org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol",
+    protocolVersion = 1)
+@InterfaceAudience.Private
+public interface QJournalProtocolPB extends
+    QJournalProtocolService.BlockingInterface {
+}



Mime
View raw message