hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1363596 [3/3] - in /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/main/bin/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/main/java/org/apache/hadoop/hdfs...
Date Fri, 20 Jul 2012 00:25:52 GMT
Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java Fri Jul 20 00:25:50 2012
@@ -17,18 +17,15 @@
  */
 package org.apache.hadoop.hdfs.server.protocol;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.io.Writable;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ComparisonChain;
 
-public class RemoteEditLog implements Writable, Comparable<RemoteEditLog> {
+public class RemoteEditLog implements Comparable<RemoteEditLog> {
   private long startTxId = HdfsConstants.INVALID_TXID;
   private long endTxId = HdfsConstants.INVALID_TXID;
+  private boolean isInProgress = false;
   
   public RemoteEditLog() {
   }
@@ -36,6 +33,13 @@ public class RemoteEditLog implements Wr
   public RemoteEditLog(long startTxId, long endTxId) {
     this.startTxId = startTxId;
     this.endTxId = endTxId;
+    this.isInProgress = (endTxId == HdfsConstants.INVALID_TXID);
+  }
+  
+  public RemoteEditLog(long startTxId, long endTxId, boolean inProgress) {
+    this.startTxId = startTxId;
+    this.endTxId = endTxId;
+    this.isInProgress = inProgress;
   }
 
   public long getStartTxId() {
@@ -45,22 +49,18 @@ public class RemoteEditLog implements Wr
   public long getEndTxId() {
     return endTxId;
   }
-    
-  @Override
-  public String toString() {
-    return "[" + startTxId + "," + endTxId + "]";
-  }
 
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeLong(startTxId);
-    out.writeLong(endTxId);
+  public boolean isInProgress() {
+    return isInProgress;
   }
 
   @Override
-  public void readFields(DataInput in) throws IOException {
-    startTxId = in.readLong();
-    endTxId = in.readLong();
+  public String toString() {
+    if (!isInProgress) {
+      return "[" + startTxId + "," + endTxId + "]";
+    } else {
+      return "[" + startTxId + "-? (in-progress)]";
+    }
   }
   
   @Override

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto Fri Jul 20 00:25:50 2012
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.hdfs.qjournal.protocol";
+option java_outer_classname = "QJournalProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "hdfs.proto";
+
+message JournalIdProto {
+  required string identifier = 1;
+}
+
+message RequestInfoProto {
+  required JournalIdProto journalId = 1;
+  required uint64 epoch = 2;
+  required uint64 ipcSerialNumber = 3;
+}
+
+message SegmentStateProto {
+  required uint64 startTxId = 1;
+  required uint64 endTxId = 2;
+  required bool isInProgress = 3;
+  required bytes md5sum = 4;
+}
+
+/**
+ * The storage format used on local disk for previously
+ * accepted decisions.
+ */
+message PersistedRecoveryPaxosData {
+  required SegmentStateProto segmentState = 1;
+  required uint64 acceptedInEpoch = 2;
+}
+
+/**
+ * journal()
+ */
+
+message JournalRequestProto {
+  required RequestInfoProto reqInfo = 1;
+  required uint64 firstTxnId = 2;
+  required uint32 numTxns = 3;
+  required bytes records = 4;
+}
+
+message JournalResponseProto { 
+}
+
+/**
+ * startLogSegment()
+ */
+message StartLogSegmentRequestProto {
+  required RequestInfoProto reqInfo = 1;
+  required uint64 txid = 2; // Transaction ID
+}
+
+message StartLogSegmentResponseProto { 
+}
+
+/**
+ * finalizeLogSegment()
+ */
+message FinalizeLogSegmentRequestProto {
+  required RequestInfoProto reqInfo = 1;
+  required uint64 startTxId = 2;
+  required uint64 endTxId = 3;
+}
+
+message FinalizeLogSegmentResponseProto { 
+}
+
+/**
+ * getJournalState()
+ */
+message GetJournalStateRequestProto {
+  required JournalIdProto jid = 1;
+}
+
+message GetJournalStateResponseProto {
+  required uint64 lastPromisedEpoch = 1;
+  required uint32 httpPort = 2;
+}
+
+/**
+ * newEpoch()
+ */
+message NewEpochRequestProto {
+  required JournalIdProto jid = 1;
+  required NamespaceInfoProto nsInfo = 2;
+  required uint64 epoch = 3;
+}
+
+message NewEpochResponseProto {
+  optional uint64 lastSegmentTxId = 1;
+}
+
+/**
+ * getEditLogManifest()
+ */
+message GetEditLogManifestRequestProto {
+  required JournalIdProto jid = 1;
+  required uint64 sinceTxId = 2;  // Transaction ID
+}
+
+message GetEditLogManifestResponseProto {
+  required RemoteEditLogManifestProto manifest = 1; 
+  required uint32 httpPort = 2;
+
+  // TODO: we should add nsinfo somewhere
+  // to verify that it matches up with our expectation
+  // required NamespaceInfoProto nsInfo = 2;
+}
+
+/**
+ * prepareRecovery()
+ */
+message PrepareRecoveryRequestProto {
+  required RequestInfoProto reqInfo = 1;
+  required uint64 segmentTxId = 2;
+}
+
+message PrepareRecoveryResponseProto {
+  optional SegmentStateProto segmentState = 1;
+  optional uint64 acceptedInEpoch = 2;
+}
+
+/**
+ * acceptRecovery()
+ */
+message AcceptRecoveryRequestProto {
+  required RequestInfoProto reqInfo = 1;
+
+  /** Details on the segment to recover */
+  required SegmentStateProto stateToAccept = 2;
+  
+  /** The URL from which the log may be copied */
+  required string fromURL = 3;
+}
+
+message AcceptRecoveryResponseProto {
+}
+
+
+/**
+ * Protocol used to journal edits to a JournalNode.
+ * See the request and response for details of rpc call.
+ */
+service QJournalProtocolService {
+  rpc getJournalState(GetJournalStateRequestProto) returns (GetJournalStateResponseProto);
+
+  rpc newEpoch(NewEpochRequestProto) returns (NewEpochResponseProto);
+
+  rpc journal(JournalRequestProto) returns (JournalResponseProto);
+
+  rpc startLogSegment(StartLogSegmentRequestProto) 
+      returns (StartLogSegmentResponseProto);
+
+  rpc finalizeLogSegment(FinalizeLogSegmentRequestProto)
+      returns (FinalizeLogSegmentResponseProto);
+
+  rpc getEditLogManifest(GetEditLogManifestRequestProto)
+      returns (GetEditLogManifestResponseProto);
+
+  rpc prepareRecovery(PrepareRecoveryRequestProto)
+      returns (PrepareRecoveryResponseProto);
+
+  rpc acceptRecovery(AcceptRecoveryRequestProto)
+      returns (AcceptRecoveryResponseProto);
+}

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto Fri Jul 20 00:25:50 2012
@@ -290,6 +290,7 @@ message BlocksWithLocationsProto {
 message RemoteEditLogProto {
   required uint64 startTxId = 1;  // Starting available edit log transaction
   required uint64 endTxId = 2;    // Ending available edit log transaction
+  optional bool isInProgress = 3 [default = false];
 }
 
 /**

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Fri Jul 20 00:25:50 2012
@@ -241,6 +241,11 @@
 </property>
 
 <property>
+  <name>dfs.namenode.edits.journal-plugin.qjournal</name>
+  <value>org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager</value>
+</property>
+
+<property>
   <name>dfs.permissions.enabled</name>
   <value>true</value>
   <description>

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html Fri Jul 20 00:25:50 2012
@@ -0,0 +1,29 @@
+<meta HTTP-EQUIV="REFRESH" content="0;url=journalstatus.jsp"/>
+<html>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<head><title>Hadoop Administration</title></head>
+
+<body>
+<h1>Hadoop Administration</h1>
+
+<ul> 
+  <li><a href="journalstatus.jsp">Status</a></li> 
+</ul>
+
+</body> 
+</html>

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp Fri Jul 20 00:25:50 2012
@@ -0,0 +1,42 @@
+<%
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file 
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+%>
+<%@ page
+  contentType="text/html; charset=UTF-8"
+  import="org.apache.hadoop.hdfs.server.common.JspHelper"
+  import="org.apache.hadoop.util.ServletUtil"
+%>
+<%!
+  //for java.io.Serializable
+  private static final long serialVersionUID = 1L;
+%>
+
+<!DOCTYPE html>
+<html>
+<link rel="stylesheet" type="text/css" href="/static/hadoop.css">
+<title>Hadoop JournalNode</title>
+    
+<body>
+<h1>JournalNode</h1>
+<%= JspHelper.getVersionTable() %>
+<hr />
+
+<br />
+<b><a href="/logs/">Logs</a></b>
+<%= ServletUtil.htmlFooter() %>

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml Fri Jul 20 00:25:50 2012
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<web-app version="2.4" xmlns="http://java.sun.com/xml/ns/j2ee">
+@journal.servlet.definitions@
+</web-app>

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Fri Jul 20 00:25:50 2012
@@ -85,6 +85,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.VersionInfo;
 
+import com.google.common.base.Charsets;
 import com.google.common.base.Joiner;
 
 /** Utilities for HDFS tests */
@@ -586,12 +587,21 @@ public class DFSTestUtil {
     IOUtils.copyBytes(is, os, s.length(), true);
   }
   
-  // Returns url content as string.
+  /**
+   * @return url content as string (UTF-8 encoding assumed)
+   */
   public static String urlGet(URL url) throws IOException {
+    return new String(urlGetBytes(url), Charsets.UTF_8);
+  }
+  
+  /**
+   * @return URL contents as a byte array
+   */
+  public static byte[] urlGetBytes(URL url) throws IOException {
     URLConnection conn = url.openConnection();
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     IOUtils.copyBytes(conn.getInputStream(), out, 4096, true);
-    return out.toString();
+    return out.toByteArray();
   }
   
   /**

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
+public class MiniJournalCluster {
+  public static class Builder {
+    private String baseDir;
+    private int numJournalNodes = 3;
+    private boolean format = true;
+    private Configuration conf;
+    
+    public Builder(Configuration conf) {
+      this.conf = conf;
+    }
+    
+    public Builder baseDir(String d) {
+      this.baseDir = d;
+      return this;
+    }
+    
+    public Builder numJournalNodes(int n) {
+      this.numJournalNodes = n;
+      return this;
+    }
+
+    public Builder format(boolean f) {
+      this.format = f;
+      return this;
+    }
+
+    public MiniJournalCluster build() throws IOException {
+      return new MiniJournalCluster(this);
+    }
+  }
+
+  private static final Log LOG = LogFactory.getLog(MiniJournalCluster.class);
+  private File baseDir;
+  private JournalNode nodes[];
+  private InetSocketAddress ipcAddrs[];
+  private InetSocketAddress httpAddrs[];
+  
+  private MiniJournalCluster(Builder b) throws IOException {
+    LOG.info("Starting MiniJournalCluster with " +
+        b.numJournalNodes + " journal nodes");
+    
+    if (b.baseDir != null) {
+      this.baseDir = new File(b.baseDir);
+    } else {
+      this.baseDir = new File(MiniDFSCluster.getBaseDirectory());
+    }
+    
+    nodes = new JournalNode[b.numJournalNodes];
+    ipcAddrs = new InetSocketAddress[b.numJournalNodes];
+    httpAddrs = new InetSocketAddress[b.numJournalNodes];
+    for (int i = 0; i < b.numJournalNodes; i++) {
+      if (b.format) {
+        File dir = getStorageDir(i);
+        LOG.debug("Fully deleting JN directory " + dir);
+        FileUtil.fullyDelete(dir);
+      }
+      nodes[i] = new JournalNode();
+      nodes[i].setConf(createConfForNode(b, i));
+      nodes[i].start();
+
+      ipcAddrs[i] = nodes[i].getBoundIpcAddress();
+      httpAddrs[i] = nodes[i].getBoundHttpAddress();
+    }
+  }
+
+  /**
+   * Set up the given Configuration object to point to the set of JournalNodes 
+   * in this cluster.
+   */
+  public URI getQuorumJournalURI(String jid) {
+    List<String> addrs = Lists.newArrayList();
+    for (InetSocketAddress addr : ipcAddrs) {
+      addrs.add("127.0.0.1:" + addr.getPort());
+    }
+    String addrsVal = Joiner.on(";").join(addrs);
+    LOG.debug("Setting logger addresses to: " + addrsVal);
+    try {
+      return new URI("qjournal://" + addrsVal + "/" + jid);
+    } catch (URISyntaxException e) {
+      throw new AssertionError(e);
+    }
+  }
+
+  /**
+   * Start the JournalNodes in the cluster.
+   */
+  public void start() throws IOException {
+    for (JournalNode jn : nodes) {
+      jn.start();
+    }
+  }
+
+  /**
+   * Shutdown all of the JournalNodes in the cluster.
+   * @throws IOException if one or more nodes failed to stop
+   */
+  public void shutdown() throws IOException {
+    boolean failed = false;
+    for (JournalNode jn : nodes) {
+      try {
+        jn.stopAndJoin(0);
+      } catch (Exception e) {
+        failed = true;
+        LOG.warn("Unable to stop journal node " + jn, e);
+      }
+    }
+    if (failed) {
+      throw new IOException("Unable to shut down. Check log for details");
+    }
+  }
+
+  private Configuration createConfForNode(Builder b, int idx) {
+    Configuration conf = new Configuration(b.conf);
+    File logDir = getStorageDir(idx);
+    conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, logDir.toString());
+    conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0");
+    conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+    return conf;
+  }
+
+  public File getStorageDir(int idx) {
+    return new File(baseDir, "journalnode-" + idx);
+  }
+  
+  public File getCurrentDir(int idx, String jid) {
+    return new File(new File(getStorageDir(idx), jid), "current");
+  }
+
+  public JournalNode getJournalNode(int i) {
+    return nodes[i];
+  }
+  
+  public void restartJournalNode(int i) throws InterruptedException, IOException {
+    Configuration conf = new Configuration(nodes[i].getConf());
+    if (nodes[i].isStarted()) {
+      nodes[i].stopAndJoin(0);
+    }
+    
+    conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "127.0.0.1:" +
+        ipcAddrs[i].getPort());
+    conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "127.0.0.1:" +
+        httpAddrs[i].getPort());
+    
+    JournalNode jn = new JournalNode();
+    jn.setConf(conf);
+    jn.start();
+  }
+
+  public int getQuorumSize() {
+    return nodes.length / 2 + 1;
+  }
+
+  public int getNumNodes() {
+    return nodes.length;
+  }
+
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.io.DataOutputBuffer;
+
+public abstract class QJMTestUtil {
+
+  public static byte[] createTxnData(int startTxn, int numTxns) throws Exception {
+    DataOutputBuffer buf = new DataOutputBuffer();
+    FSEditLogOp.Writer writer = new FSEditLogOp.Writer(buf);
+    
+    for (long txid = startTxn; txid < startTxn + numTxns; txid++) {
+      FSEditLogOp op = NameNodeAdapter.createMkdirOp("tx " + txid);
+      op.setTransactionId(txid);
+      writer.writeOp(op);
+    }
+    
+    return Arrays.copyOf(buf.getData(), buf.getLength());
+  }
+  
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
+import org.junit.Test;
+
+
+public class TestMiniJournalCluster {
+  @Test
+  public void testStartStop() throws IOException {
+    Configuration conf = new Configuration();
+    MiniJournalCluster c = new MiniJournalCluster.Builder(conf)
+      .build();
+    try {
+      URI uri = c.getQuorumJournalURI("myjournal");
+      String[] addrs = uri.getAuthority().split(";");
+      assertEquals(3, addrs.length);
+      
+      JournalNode node = c.getJournalNode(0);
+      String dir = node.getConf().get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY);
+      assertEquals(MiniDFSCluster.getBaseDirectory() + "journalnode-0",
+          dir);
+    } finally {
+      c.shutdown();
+    }
+  }
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.ExitUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNNWithQJM {
+  Configuration conf = new HdfsConfiguration();
+  private MiniJournalCluster mjc;
+  private Path TEST_PATH = new Path("/test-dir");
+  private Path TEST_PATH_2 = new Path("/test-dir");
+
+  @Before
+  public void resetSystemExit() {
+    ExitUtil.resetFirstExitException();
+  }
+  
+  @Before
+  public void startJNs() throws Exception {
+    mjc = new MiniJournalCluster.Builder(conf).build();
+  }
+  
+  @After
+  public void stopJNs() throws Exception {
+    if (mjc != null) {
+      mjc.shutdown();
+    }
+  }
+  
+  @Test
+  public void testLogAndRestart() throws IOException {
+    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+        MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+        mjc.getQuorumJournalURI("myjournal").toString());
+    
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(0)
+      .manageNameDfsDirs(false)
+      .build();
+    try {
+      cluster.getFileSystem().mkdirs(TEST_PATH);
+      
+      // Restart the NN and make sure the edit was persisted
+      // and loaded again
+      cluster.restartNameNode();
+      
+      assertTrue(cluster.getFileSystem().exists(TEST_PATH));
+      cluster.getFileSystem().mkdirs(TEST_PATH_2);
+      
+      // Restart the NN again and make sure both edits are persisted.
+      cluster.restartNameNode();
+      assertTrue(cluster.getFileSystem().exists(TEST_PATH));
+      assertTrue(cluster.getFileSystem().exists(TEST_PATH_2));
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  @Test
+  public void testNewNamenodeTakesOverWriter() throws Exception {
+    File nn1Dir = new File(
+        MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image-nn1");
+    File nn2Dir = new File(
+        MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image-nn2");
+    
+    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+        nn1Dir.getAbsolutePath());
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+        mjc.getQuorumJournalURI("myjournal").toString());
+    
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(0)
+      .manageNameDfsDirs(false)
+      .checkExitOnShutdown(false)
+      .build();
+
+    try {
+      cluster.getFileSystem().mkdirs(TEST_PATH);
+      
+      // Start a second NN pointed to the same quorum.
+      // We need to copy the image dir from the first NN -- or else
+      // the new NN will just be rejected because of Namespace mismatch.
+      FileUtil.fullyDelete(nn2Dir);
+      FileUtil.copy(nn1Dir, FileSystem.getLocal(conf).getRaw(),
+          new Path(nn2Dir.getAbsolutePath()), false, conf);
+      
+      Configuration conf2 = new Configuration();
+      conf2.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+          nn2Dir.getAbsolutePath());
+      conf2.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+          mjc.getQuorumJournalURI("myjournal").toString());
+      MiniDFSCluster cluster2 = new MiniDFSCluster.Builder(conf2)
+        .numDataNodes(0)
+        .format(false)
+        .manageNameDfsDirs(false)
+        .build();
+      
+      // Check that the new cluster sees the edits made on the old cluster
+      try {
+        assertTrue(cluster2.getFileSystem().exists(TEST_PATH));
+      } finally {
+        cluster2.shutdown();
+      }
+      
+      // Check that, if we try to write to the old NN
+      // that it aborts.
+      try {
+        cluster.getFileSystem().mkdirs(new Path("/x"));
+        fail("Did not abort trying to write to a fenced NN");
+      } catch (RemoteException re) {
+        GenericTestUtils.assertExceptionContains(
+            "Could not sync enough journals to persistent storage", re);
+      }
+    } finally {
+      //cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testMismatchedNNIsRejected() throws Exception {
+    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+        MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
+        mjc.getQuorumJournalURI("myjournal").toString());
+    
+    // Start a NN, so the storage is formatted with its namespace info. 
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+      .numDataNodes(0)
+      .manageNameDfsDirs(false)
+      .build();
+    cluster.shutdown();
+    
+    // Create a new (freshly-formatted) NN, which should not be able to
+    // reuse the same journal, since its journal ID would not match.
+    try {
+      cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(0)
+        .manageNameDfsDirs(false)
+        .build();
+      fail("New NN with different namespace should have been rejected");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains(
+          "Unable to start log segment 1: too few journals", ioe);
+    }
+  }
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
+import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster.Builder;
+import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
+import org.apache.hadoop.hdfs.qjournal.client.AsyncLoggerSet;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+
+public class TestEpochsAreUnique {
+  private static final Log LOG = LogFactory.getLog(TestEpochsAreUnique.class);
+  private static final String JID = "testEpochsAreUnique-jid";
+  private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
+      12345, "mycluster", "my-bp", 0L, 0);
+  private Random r = new Random();
+  
+  @Test
+  public void testSingleThreaded() throws IOException {
+    Configuration conf = new Configuration();
+    MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build();
+    URI uri = cluster.getQuorumJournalURI(JID);
+    try {
+      // With no failures or contention, epochs should increase one-by-one
+      for (int i = 0; i < 5; i++) {
+        AsyncLoggerSet als = new AsyncLoggerSet(
+            QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO));
+        als.createNewUniqueEpoch(FAKE_NSINFO);
+        assertEquals(i + 1, als.getEpoch());
+      }
+      
+      long prevEpoch = 5;
+      // With some failures injected, it should still always increase, perhaps
+      // skipping some
+      for (int i = 0; i < 20; i++) {
+        AsyncLoggerSet als = new AsyncLoggerSet(
+            makeFaulty(QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO)));
+        long newEpoch = -1;
+        while (true) {
+          try {
+            als.createNewUniqueEpoch(FAKE_NSINFO);
+            newEpoch = als.getEpoch();
+            break;
+          } catch (IOException ioe) {
+            // It's OK to fail to create an epoch, since we randomly inject
+            // faults. It's possible we'll inject faults in too many of the
+            // underlying nodes, and a failure is expected in that case
+          }
+        }
+        LOG.info("Created epoch " + newEpoch);
+        assertTrue("New epoch " + newEpoch + " should be greater than previous " +
+            prevEpoch, newEpoch > prevEpoch);
+        prevEpoch = newEpoch;
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+
+  private List<AsyncLogger> makeFaulty(List<AsyncLogger> loggers) {
+    List<AsyncLogger> ret = Lists.newArrayList();
+    for (AsyncLogger l : loggers) {
+      AsyncLogger spy = Mockito.spy(l);
+      Mockito.doAnswer(new SometimesFaulty<Long>(0.10f))
+          .when(spy).getJournalState();
+      Mockito.doAnswer(new SometimesFaulty<Void>(0.40f))
+          .when(spy).newEpoch(Mockito.anyLong());
+      ret.add(spy);
+    }
+    return ret;
+  }
+  
+  private class SometimesFaulty<T> implements Answer<ListenableFuture<T>> {
+    private float faultProbability;
+
+    public SometimesFaulty(float faultProbability) {
+      this.faultProbability = faultProbability;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public ListenableFuture<T> answer(InvocationOnMock invocation)
+        throws Throwable {
+      if (r.nextFloat() < faultProbability) {
+        return Futures.immediateFailedFuture(
+            new IOException("Injected fault"));
+      }
+      return (ListenableFuture<T>)invocation.callRealMethod();
+    }
+  }
+
+
+
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
+import org.apache.hadoop.hdfs.qjournal.client.LoggerTooFarBehindException;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.base.Supplier;
+
+public class TestIPCLoggerChannel {
+  private static final Log LOG = LogFactory.getLog(
+      TestIPCLoggerChannel.class);
+  
+  private Configuration conf = new Configuration();
+  private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
+      12345, "mycluster", "my-bp", 0L, 0);
+  private static final String JID = "test-journalid";
+  private static final InetSocketAddress FAKE_ADDR =
+      new InetSocketAddress(0);
+  private static final byte[] FAKE_DATA = new byte[4096];
+  
+  private QJournalProtocol mockProxy = Mockito.mock(QJournalProtocol.class);
+  private IPCLoggerChannel ch;
+  
+  private static final int LIMIT_QUEUE_SIZE_MB = 1;
+  private static final int LIMIT_QUEUE_SIZE_BYTES =
+      LIMIT_QUEUE_SIZE_MB * 1024 * 1024;
+  
+  @Before
+  public void setupMock() {
+    conf.setInt(DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY,
+        LIMIT_QUEUE_SIZE_MB);
+
+    // Channel to the mock object instead of a real IPC proxy.
+    ch = new IPCLoggerChannel(conf, FAKE_NSINFO, JID, FAKE_ADDR) {
+      @Override
+      protected QJournalProtocol getProxy() throws IOException {
+        return mockProxy;
+      }
+    };
+    
+    ch.setEpoch(1);
+  }
+  
+  @Test
+  public void testSimpleCall() throws Exception {
+    ch.sendEdits(1, 3, FAKE_DATA).get();
+    Mockito.verify(mockProxy).journal(Mockito.<RequestInfo>any(),
+        Mockito.eq(1L), Mockito.eq(3), Mockito.same(FAKE_DATA));
+  }
+
+  
+  /**
+   * Test that, once the queue eclipses the configure size limit,
+   * calls to journal more data are rejected.
+   */
+  @Test
+  public void testQueueLimiting() throws Exception {
+    
+    // Block the underlying fake proxy from actually completing any calls.
+    DelayAnswer delayer = new DelayAnswer(LOG);
+    Mockito.doAnswer(delayer).when(mockProxy).journal(
+        Mockito.<RequestInfo>any(),
+        Mockito.eq(1L), Mockito.eq(1), Mockito.same(FAKE_DATA));
+    
+    // Queue up the maximum number of calls.
+    int numToQueue = LIMIT_QUEUE_SIZE_BYTES / FAKE_DATA.length;
+    for (int i = 1; i <= numToQueue; i++) {
+      ch.sendEdits((long)i, 1, FAKE_DATA);
+    }
+    
+    // The accounting should show the correct total number queued.
+    assertEquals(LIMIT_QUEUE_SIZE_BYTES, ch.getQueuedEditsSize());
+    
+    // Trying to queue any more should fail.
+    try {
+      ch.sendEdits(numToQueue + 1, 1, FAKE_DATA).get(1, TimeUnit.SECONDS);
+      fail("Did not fail to queue more calls after queue was full");
+    } catch (ExecutionException ee) {
+      if (!(ee.getCause() instanceof LoggerTooFarBehindException)) {
+        throw ee;
+      }
+    }
+    
+    delayer.proceed();
+
+    // After we allow it to proceeed, it should chug through the original queue
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return ch.getQueuedEditsSize() == 0;
+      }
+    }, 10, 1000);
+  }
+
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.hdfs.qjournal.client.QuorumCall;
+import org.junit.Test;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.SettableFuture;
+
+public class TestQuorumCall {
+  @Test(timeout=10000)
+  public void testQuorums() throws Exception {
+    Map<String, SettableFuture<String>> futures = ImmutableMap.of(
+        "f1", SettableFuture.<String>create(),
+        "f2", SettableFuture.<String>create(),
+        "f3", SettableFuture.<String>create());
+    
+    QuorumCall<String, String> q = QuorumCall.create(futures);
+    assertEquals(0, q.countResponses());
+    
+    futures.get("f1").set("first future");
+    q.waitFor(1, 0, 0, 100000); // wait for 1 response
+    q.waitFor(0, 1, 0, 100000); // wait for 1 success
+    assertEquals(1, q.countResponses());
+    
+    
+    futures.get("f2").setException(new Exception("error"));
+    assertEquals(2, q.countResponses());
+    
+    futures.get("f3").set("second future");
+    q.waitFor(3, 0, 100, 100000); // wait for 3 responses
+    q.waitFor(0, 2, 100, 100000); // 2 successes
+
+    assertEquals(3, q.countResponses());
+    assertEquals("f1=first future,f3=second future",
+        Joiner.on(",").withKeyValueSeparator("=").join(
+            new TreeMap<String, String>(q.getResults())));
+    
+    try {
+      q.waitFor(0, 4, 100, 10);
+      fail("Didn't time out waiting for more responses than came back");
+    } catch (TimeoutException te) {
+      // expected
+    }
+  }
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.SortedSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster;
+import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
+import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumException;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+/**
+ * Functional tests for QuorumJournalManager.
+ * For true unit tests, see {@link TestQuorumJournalManagerUnit}.
+ */
+public class TestQuorumJournalManager {
+  private static final Log LOG = LogFactory.getLog(
+      TestQuorumJournalManager.class);
+  
+  private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
+      12345, "mycluster", "my-bp", 0L, 0);
+  private static final String JID = "testQuorumJournalManager";
+  private MiniJournalCluster cluster;
+  private Configuration conf;
+  private QuorumJournalManager qjm;
+  private List<AsyncLogger> spies;
+
+  @Before
+  public void setup() throws Exception {
+    conf = new Configuration();
+    cluster = new MiniJournalCluster.Builder(conf)
+      .build();
+    
+    qjm = createSpyingQJM();
+    spies = qjm.getLoggerSetForTests().getLoggersForTests();
+
+    qjm.recoverUnfinalizedSegments();
+    assertEquals(1, qjm.getLoggerSetForTests().getEpoch());
+  }
+  
+  @After
+  public void shutdown() throws IOException {
+    cluster.shutdown();
+  }
+  
+  @Test
+  public void testSingleWriter() throws Exception {
+    writeSegment(qjm, 1, 3, true);
+    
+    // Should be finalized
+    checkRecovery(cluster, 1, 3);
+    
+    // Start a new segment
+    writeSegment(qjm, 4, 1, true);
+
+    // Should be finalized
+    checkRecovery(cluster, 4, 4);
+  }
+  
+  @Test
+  public void testOrchestratedFailures() throws Exception {
+    writeSegment(qjm, 1, 3, true);
+    writeSegment(qjm, 4, 3, true);
+    
+    SortedSet<Long> serials = Sets.newTreeSet();
+    for (AsyncLogger l : qjm.getLoggerSetForTests().getLoggersForTests()) {
+      IPCLoggerChannel ch = (IPCLoggerChannel)l;
+      ch.waitForAllPendingCalls();
+      serials.add(ch.getNextIpcSerial());
+    }
+
+    // All of the loggers should have sent the same number of RPCs, since there
+    // were no failures.
+    assertEquals(1, serials.size());
+    
+    long maxSerial = serials.first();
+    LOG.info("Max IPC serial = " + maxSerial);
+    
+    cluster.shutdown();
+    
+    cluster = new MiniJournalCluster.Builder(conf)
+      .build();
+    qjm = createSpyingQJM();
+    spies = qjm.getLoggerSetForTests().getLoggersForTests();
+
+  }
+  
+  /**
+   * Test case where a new writer picks up from an old one with no failures
+   * and the previous unfinalized segment entirely consistent -- i.e. all
+   * the JournalNodes end at the same transaction ID.
+   */
+  @Test
+  public void testChangeWritersLogsInSync() throws Exception {
+    writeSegment(qjm, 1, 3, false);
+    assertExistsInQuorum(cluster,
+        NNStorage.getInProgressEditsFileName(1));
+
+    // Make a new QJM
+    qjm = new QuorumJournalManager(
+        conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO);
+    qjm.recoverUnfinalizedSegments();
+    checkRecovery(cluster, 1, 3);
+  }
+  
+  /**
+   * Test case where a new writer picks up from an old one which crashed
+   * with the three loggers at different txnids
+   */
+  @Test
+  public void testChangeWritersLogsOutOfSync1() throws Exception {
+    // Journal states:  [3, 4, 5]
+    // During recovery: [x, 4, 5]
+    // Should recovery to txn 5
+    doOutOfSyncTest(0, 5L);
+  }
+
+  @Test
+  public void testChangeWritersLogsOutOfSync2() throws Exception {
+    // Journal states:  [3, 4, 5]
+    // During recovery: [3, x, 5]
+    // Should recovery to txn 5
+    doOutOfSyncTest(1, 5L);
+  }
+
+  @Test
+  public void testChangeWritersLogsOutOfSync3() throws Exception {
+    // Journal states:  [3, 4, 5]
+    // During recovery: [3, 4, x]
+    // Should recovery to txn 4
+    doOutOfSyncTest(2, 4L);
+  }
+
+  
+  private void doOutOfSyncTest(int missingOnRecoveryIdx,
+      long expectedRecoveryTxnId) throws Exception {
+    EditLogOutputStream stm = qjm.startLogSegment(1);
+    
+    failLoggerAtTxn(spies.get(0), 4);
+    failLoggerAtTxn(spies.get(1), 5);
+    
+    writeTxns(stm, 1, 3);
+    
+    // This should succeed to 2/3 loggers
+    writeTxns(stm, 4, 1);
+    
+    // This should only succeed to 1 logger (index 2). Hence it should
+    // fail
+    try {
+      writeTxns(stm, 5, 1);
+      fail("Did not fail to write when only a minority succeeded");
+    } catch (QuorumException qe) {
+      GenericTestUtils.assertExceptionContains(
+          "too many exceptions to achieve quorum size 2/3",
+          qe);
+    }
+    
+    assertExistsInQuorum(cluster,
+        NNStorage.getInProgressEditsFileName(1));
+
+    // Shut down the specified JN, so it's not present during recovery.
+    cluster.getJournalNode(missingOnRecoveryIdx).stopAndJoin(0);
+
+    // Make a new QJM
+    qjm = createSpyingQJM();
+    
+    qjm.recoverUnfinalizedSegments();
+    checkRecovery(cluster, 1, expectedRecoveryTxnId);
+  }
+  
+  
+  private void failLoggerAtTxn(AsyncLogger spy, long txid) {
+    TestQuorumJournalManagerUnit.futureThrows(new IOException("mock failure"))
+      .when(spy).sendEdits(
+        Mockito.eq(txid), Mockito.eq(1), Mockito.<byte[]>any());
+  }
+
+  /**
+   * edit lengths [3,4,5]
+   * first recovery:
+   * - sees [3,4,x]
+   * - picks length 4 for recoveryEndTxId
+   * - calls acceptRecovery()
+   * - crashes before finalizing
+   * second recovery:
+   * - sees [x, 4, 5]
+   * - should pick recovery length 4, even though it saw
+   *   a larger txid, because a previous recovery accepted it
+   */
+  @Test
+  public void testRecoverAfterIncompleteRecovery() throws Exception {
+    EditLogOutputStream stm = qjm.startLogSegment(1);
+    
+    failLoggerAtTxn(spies.get(0), 4);
+    failLoggerAtTxn(spies.get(1), 5);
+    
+    writeTxns(stm, 1, 3);
+    
+    // This should succeed to 2/3 loggers
+    writeTxns(stm, 4, 1);
+    
+    // This should only succeed to 1 logger (index 2). Hence it should
+    // fail
+    try {
+      writeTxns(stm, 5, 1);
+      fail("Did not fail to write when only a minority succeeded");
+    } catch (QuorumException qe) {
+      GenericTestUtils.assertExceptionContains(
+          "too many exceptions to achieve quorum size 2/3",
+          qe);
+    }
+
+    // Shut down the logger that has length = 5
+    cluster.getJournalNode(2).stopAndJoin(0);
+
+    qjm = createSpyingQJM();
+    spies = qjm.getLoggerSetForTests().getLoggersForTests();
+
+    // Allow no logger to finalize
+    for (AsyncLogger spy : spies) {
+      TestQuorumJournalManagerUnit.futureThrows(new IOException("injected"))
+        .when(spy).finalizeLogSegment(Mockito.eq(1L),
+            Mockito.eq(4L));
+    }
+    try {
+      qjm.recoverUnfinalizedSegments();
+      fail("Should have failed recovery since no finalization occurred");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains("injected", ioe);
+    }
+    
+    // Now bring back the logger that had 5, and run recovery again.
+    // We should recover to 4, even though there's a longer log.
+    cluster.getJournalNode(0).stopAndJoin(0);
+    cluster.restartJournalNode(2);
+    
+    qjm = createSpyingQJM();
+    spies = qjm.getLoggerSetForTests().getLoggersForTests();
+    qjm.recoverUnfinalizedSegments();
+    checkRecovery(cluster, 1, 4);
+  }
+  
+  
+  private QuorumJournalManager createSpyingQJM()
+      throws IOException, URISyntaxException {
+    return new QuorumJournalManager(
+        conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO) {
+          @Override
+          protected List<AsyncLogger> createLoggers() throws IOException {
+            LOG.info("===> make spies");
+            List<AsyncLogger> realLoggers = super.createLoggers();
+            List<AsyncLogger> spies = Lists.newArrayList();
+            for (AsyncLogger logger : realLoggers) {
+              spies.add(Mockito.spy(logger));
+            }
+            return spies;
+          }
+    };
+  }
+
+  private void writeSegment(QuorumJournalManager qjm,
+      int startTxId, int numTxns, boolean finalize) throws IOException {
+    EditLogOutputStream stm = qjm.startLogSegment(startTxId);
+    // Should create in-progress
+    assertExistsInQuorum(cluster,
+        NNStorage.getInProgressEditsFileName(startTxId));
+    
+    writeTxns(stm, startTxId, numTxns);
+    if (finalize) {
+      stm.close();
+      qjm.finalizeLogSegment(startTxId, startTxId + numTxns - 1);
+    }
+  }
+
+  private void writeTxns(EditLogOutputStream stm, int startTxId, int numTxns)
+      throws IOException {
+    for (long txid = startTxId; txid < startTxId + numTxns; txid++) {
+      TestQuorumJournalManagerUnit.writeOp(stm, txid);
+    }
+    stm.setReadyToFlush();
+    stm.flush();
+  }
+
+  private void assertExistsInQuorum(MiniJournalCluster cluster,
+      String fname) {
+    int count = 0;
+    for (int i = 0; i < 3; i++) {
+      File dir = cluster.getCurrentDir(i, JID);
+      if (new File(dir, fname).exists()) {
+        count++;
+      }
+    }
+    assertTrue("File " + fname + " should exist in a quorum of dirs",
+        count >= cluster.getQuorumSize());
+  }
+  
+  private void checkRecovery(MiniJournalCluster cluster,
+      long segmentTxId, long expectedEndTxId)
+      throws IOException {
+    int numFinalized = 0;
+    for (int i = 0; i < cluster.getNumNodes(); i++) {
+      File logDir = cluster.getCurrentDir(i, JID);
+      EditLogFile elf = FileJournalManager.getLogFile(logDir, segmentTxId);
+      if (elf == null) {
+        continue;
+      }
+      if (!elf.isInProgress()) {
+        numFinalized++;
+        if (elf.getLastTxId() != expectedEndTxId) {
+          fail("File " + elf + " finalized to wrong txid, expected " +
+              expectedEndTxId);
+        }
+      }      
+    }
+    
+    if (numFinalized < cluster.getQuorumSize()) {
+      fail("Did not find a quorum of finalized logs starting at " +
+          segmentTxId);
+    }
+  }
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.client;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.eq;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumException;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Stubber;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+/**
+ * True unit tests for QuorumJournalManager
+ */
+public class TestQuorumJournalManagerUnit {
+  static {
+    ((Log4JLogger)QuorumJournalManager.LOG).getLogger().setLevel(Level.ALL);
+  }
+  private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
+      12345, "mycluster", "my-bp", 0L, 0);
+
+  private Configuration conf = new Configuration();
+  private List<AsyncLogger> spyLoggers;
+  private QuorumJournalManager qjm;
+  
+  @Before
+  public void setup() throws Exception {
+    spyLoggers = ImmutableList.of(
+        mockLogger(),
+        mockLogger(),
+        mockLogger());
+
+    qjm = new QuorumJournalManager(conf, new URI("qjournal://host/jid"), FAKE_NSINFO) {
+      @Override
+      protected List<AsyncLogger> createLoggers() {
+        return spyLoggers;
+      }
+    };
+
+    for (AsyncLogger logger : spyLoggers) {
+      futureReturns(GetJournalStateResponseProto.newBuilder()
+          .setLastPromisedEpoch(0)
+          .setHttpPort(-1)
+          .build())
+        .when(logger).getJournalState();
+      
+      futureReturns(
+          NewEpochResponseProto.newBuilder().build()
+          ).when(logger).newEpoch(Mockito.anyLong());
+    }
+    
+    qjm.recoverUnfinalizedSegments();
+  }
+  
+  private AsyncLogger mockLogger() {
+    return Mockito.mock(AsyncLogger.class);
+  }
+  
+  static <V> Stubber futureReturns(V value) {
+    ListenableFuture<V> ret = Futures.immediateFuture(value);
+    return Mockito.doReturn(ret);
+  }
+  
+  static Stubber futureThrows(Throwable t) {
+    ListenableFuture<?> ret = Futures.immediateFailedFuture(t);
+    return Mockito.doReturn(ret);
+  }
+
+
+  @Test
+  public void testAllLoggersStartOk() throws Exception {
+    futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong());
+    futureReturns(null).when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong());
+    futureReturns(null).when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong());
+    qjm.startLogSegment(1);
+  }
+
+  @Test
+  public void testQuorumOfLoggersStartOk() throws Exception {
+    futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong());
+    futureReturns(null).when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong());
+    futureThrows(new IOException("logger failed"))
+      .when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong());
+    qjm.startLogSegment(1);
+  }
+  
+  @Test
+  public void testQuorumOfLoggersFail() throws Exception {
+    futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong());
+    futureThrows(new IOException("logger failed"))
+    .when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong());
+    futureThrows(new IOException("logger failed"))
+      .when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong());
+    try {
+      qjm.startLogSegment(1);
+      fail("Did not throw when quorum failed");
+    } catch (QuorumException qe) {
+      GenericTestUtils.assertExceptionContains("logger failed", qe);
+    }
+  }
+  
+  @Test
+  public void testWriteEdits() throws Exception {
+    EditLogOutputStream stm = createLogSegment();
+    writeOp(stm, 1);
+    writeOp(stm, 2);
+    
+    stm.setReadyToFlush();
+    writeOp(stm, 3);
+    
+    // The flush should log txn 1-2
+    futureReturns(null).when(spyLoggers.get(0)).sendEdits(
+        eq(1L), eq(2), Mockito.<byte[]>any());
+    futureReturns(null).when(spyLoggers.get(1)).sendEdits(
+        eq(1L), eq(2), Mockito.<byte[]>any());
+    futureReturns(null).when(spyLoggers.get(2)).sendEdits(
+        eq(1L), eq(2), Mockito.<byte[]>any());
+    stm.flush();
+
+    // Another flush should now log txn #3
+    stm.setReadyToFlush();
+    futureReturns(null).when(spyLoggers.get(0)).sendEdits(
+        eq(3L), eq(1), Mockito.<byte[]>any());
+    futureReturns(null).when(spyLoggers.get(1)).sendEdits(
+        eq(3L), eq(1), Mockito.<byte[]>any());
+    futureReturns(null).when(spyLoggers.get(2)).sendEdits(
+        eq(3L), eq(1), Mockito.<byte[]>any());
+    stm.flush();
+  }
+  
+  @Test
+  public void testWriteEditsOneSlow() throws Exception {
+    EditLogOutputStream stm = createLogSegment();
+    writeOp(stm, 1);
+    stm.setReadyToFlush();
+    
+    // Make the first two logs respond immediately
+    futureReturns(null).when(spyLoggers.get(0)).sendEdits(
+        eq(1L), eq(1), Mockito.<byte[]>any());
+    futureReturns(null).when(spyLoggers.get(1)).sendEdits(
+        eq(1L), eq(1), Mockito.<byte[]>any());
+    
+    // And the third log not respond
+    SettableFuture<Void> slowLog = SettableFuture.<Void>create();
+    Mockito.doReturn(slowLog).when(spyLoggers.get(2)).sendEdits(
+        eq(1L), eq(1), Mockito.<byte[]>any());
+    stm.flush();
+  }
+
+  private EditLogOutputStream createLogSegment() throws IOException {
+    futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong());
+    futureReturns(null).when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong());
+    futureReturns(null).when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong());
+    EditLogOutputStream stm = qjm.startLogSegment(1);
+    return stm;
+  }
+
+  static void writeOp(EditLogOutputStream stm, long txid) throws IOException {
+    FSEditLogOp op = NameNodeAdapter.createMkdirOp("tx " + txid);
+    op.setTransactionId(txid);
+    stm.write(op);
+  }
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProtoOrBuilder;
+import org.apache.hadoop.hdfs.qjournal.server.Journal;
+import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestJournal {
+  private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
+      12345, "mycluster", "my-bp", 0L, 0);
+  private static final NamespaceInfo FAKE_NSINFO_2 = new NamespaceInfo(
+      6789, "mycluster", "my-bp", 0L, 0);
+  
+  private static final String JID = "test-journal";
+
+  private static final File TEST_LOG_DIR = new File(
+      new File(MiniDFSCluster.getBaseDirectory()), "TestJournal");
+
+  private StorageErrorReporter mockErrorReporter = Mockito.mock(
+      StorageErrorReporter.class);
+
+  private Journal journal;
+
+  
+  @Before
+  public void setup() throws Exception {
+    FileUtil.fullyDelete(TEST_LOG_DIR);
+    journal = new Journal(TEST_LOG_DIR, mockErrorReporter);
+  }
+  
+  @After
+  public void verifyNoStorageErrors() throws Exception{
+    Mockito.verify(mockErrorReporter, Mockito.never())
+      .reportErrorOnFile(Mockito.<File>any());
+  }
+  
+  @Test
+  public void testEpochHandling() throws Exception {
+    assertEquals(0, journal.getLastPromisedEpoch());
+    NewEpochResponseProto newEpoch =
+        journal.newEpoch(FAKE_NSINFO, 1);
+    assertFalse(newEpoch.hasLastSegmentTxId());
+    assertEquals(1, journal.getLastPromisedEpoch());
+    journal.newEpoch(FAKE_NSINFO, 3);
+    assertFalse(newEpoch.hasLastSegmentTxId());
+    assertEquals(3, journal.getLastPromisedEpoch());
+    try {
+      journal.newEpoch(FAKE_NSINFO, 3);
+      fail("Should have failed to promise same epoch twice");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains(
+          "Proposed epoch 3 <= last promise 3", ioe);
+    }
+    try {
+      journal.startLogSegment(new RequestInfo(JID, 1L, 1L),
+          12345L);
+      fail("Should have rejected call from prior epoch");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains(
+          "epoch 1 is less than the last promised epoch 3", ioe);
+    }
+    try {
+      journal.journal(new RequestInfo(JID, 1L, 1L),
+          100L, 0, new byte[0]);
+      fail("Should have rejected call from prior epoch");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains(
+          "epoch 1 is less than the last promised epoch 3", ioe);
+    }
+  }
+
+  @Test
+  public void testRestartJournal() throws Exception {
+    journal.newEpoch(FAKE_NSINFO, 1);
+    journal.startLogSegment(new RequestInfo("j", 1, 1), 1);
+    journal.journal(new RequestInfo("j", 1, 2), 1, 2, 
+        QJMTestUtil.createTxnData(1, 2));
+    // Don't finalize.
+    
+    journal.close(); // close to unlock the storage dir
+    
+    // Now re-instantiate, make sure history is still there
+    journal = new Journal(TEST_LOG_DIR, mockErrorReporter);
+    assertEquals(1, journal.getLastPromisedEpoch());
+    NewEpochResponseProtoOrBuilder newEpoch = journal.newEpoch(FAKE_NSINFO, 2);
+    assertEquals(1, newEpoch.getLastSegmentTxId());
+  }
+  
+  @Test
+  public void testJournalLocking() throws Exception {
+    StorageDirectory sd = journal.getStorage().getStorageDir(0);
+    File lockFile = new File(sd.getRoot(), Storage.STORAGE_FILE_LOCK);
+
+    // Journal should not be locked, since we lazily initialize it.
+    assertFalse(lockFile.exists());
+
+    journal.newEpoch(FAKE_NSINFO,  1);
+    Assume.assumeTrue(journal.getStorage().getStorageDir(0).isLockSupported());
+    
+    // Journal should be locked
+    GenericTestUtils.assertExists(lockFile);
+    
+    Journal journal2 = new Journal(TEST_LOG_DIR, mockErrorReporter);
+    try {
+      journal2.newEpoch(FAKE_NSINFO, 2);
+      fail("Did not fail to create another journal in same dir");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains(
+          "Cannot lock storage", ioe);
+    }
+    
+    journal.close();
+    
+    // Journal should no longer be locked after the close() call.
+    journal2.newEpoch(FAKE_NSINFO, 2);
+  }
+  
+  @Test
+  public void testNamespaceVerification() throws Exception {
+    journal.newEpoch(FAKE_NSINFO, 1);
+
+    try {
+      journal.newEpoch(FAKE_NSINFO_2, 2);
+      fail("Did not fail newEpoch() when namespaces mismatched");
+    } catch (IOException ioe) {
+      GenericTestUtils.assertExceptionContains(
+          "Incompatible namespaceID", ioe);
+    }
+  }
+
+}

Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java?rev=1363596&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java (added)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java Fri Jul 20 00:25:50 2012
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
+import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.server.Journal;
+import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.primitives.Bytes;
+import com.google.common.primitives.Ints;
+
+
+public class TestJournalNode {
+  private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
+      12345, "mycluster", "my-bp", 0L, 0);
+  private static final String JID = "test-journalid";
+
+  private JournalNode jn;
+  private Journal journal; 
+  private Configuration conf = new Configuration();
+  private IPCLoggerChannel ch;
+
+  static {
+    // Avoid an error when we double-initialize JvmMetrics
+    DefaultMetricsSystem.setMiniClusterMode(true);
+  }
+  
+  @Before
+  public void setup() throws Exception {
+    conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
+        "0.0.0.0:0");
+    jn = new JournalNode();
+    jn.setConf(conf);
+    jn.start();
+    journal = jn.getOrCreateJournal(JID);
+    journal.format(FAKE_NSINFO);
+    
+    ch = new IPCLoggerChannel(conf, FAKE_NSINFO, JID, jn.getBoundIpcAddress());
+  }
+  
+  @After
+  public void teardown() throws Exception {
+    jn.stop(0);
+  }
+  
+  @Test
+  public void testJournal() throws Exception {
+    IPCLoggerChannel ch = new IPCLoggerChannel(
+        conf, FAKE_NSINFO, JID, jn.getBoundIpcAddress());
+    ch.newEpoch(1).get();
+    ch.setEpoch(1);
+    ch.startLogSegment(1).get();
+    ch.sendEdits(1, 1, "hello".getBytes(Charsets.UTF_8)).get();
+  }
+  
+  
+  @Test
+  public void testReturnsSegmentInfoAtEpochTransition() throws Exception {
+    ch.newEpoch(1).get();
+    ch.setEpoch(1);
+    ch.startLogSegment(1).get();
+    ch.sendEdits(1, 2, QJMTestUtil.createTxnData(1, 2)).get();
+    
+    // Switch to a new epoch without closing earlier segment
+    NewEpochResponseProto response = ch.newEpoch(2).get();
+    ch.setEpoch(2);
+    assertEquals(1, response.getLastSegmentTxId());
+    
+    ch.finalizeLogSegment(1, 2).get();
+    
+    // Switch to a new epoch after just closing the earlier segment.
+    response = ch.newEpoch(3).get();
+    ch.setEpoch(3);
+    assertEquals(1, response.getLastSegmentTxId());
+    
+    // Start a segment but don't write anything, check newEpoch segment info
+    ch.startLogSegment(3).get();
+    response = ch.newEpoch(4).get();
+    ch.setEpoch(4);
+    assertEquals(3, response.getLastSegmentTxId());
+  }
+  
+  @Test
+  public void testHttpServer() throws Exception {
+    InetSocketAddress addr = jn.getBoundHttpAddress();
+    assertTrue(addr.getPort() > 0);
+    
+    String urlRoot = "http://localhost:" + addr.getPort();
+    
+    // Check default servlets.
+    String pageContents = DFSTestUtil.urlGet(new URL(urlRoot + "/jmx"));
+    assertTrue("Bad contents: " + pageContents,
+        pageContents.contains(
+            "Hadoop:service=JournalNode,name=JvmMetrics"));
+    
+    // Check JSP page.
+    pageContents = DFSTestUtil.urlGet(
+        new URL(urlRoot + "/journalstatus.jsp"));
+    assertTrue(pageContents.contains("JournalNode"));
+
+    // Create some edits on server side
+    byte[] EDITS_DATA = QJMTestUtil.createTxnData(1, 3);
+    IPCLoggerChannel ch = new IPCLoggerChannel(
+        conf, FAKE_NSINFO, JID, jn.getBoundIpcAddress());
+    ch.newEpoch(1).get();
+    ch.setEpoch(1);
+    ch.startLogSegment(1).get();
+    ch.sendEdits(1, 3, EDITS_DATA).get();
+    ch.finalizeLogSegment(1, 3).get();
+
+    // Attempt to retrieve via HTTP, ensure we get the data back
+    // including the header we expected
+    byte[] retrievedViaHttp = DFSTestUtil.urlGetBytes(new URL(urlRoot +
+        "/getJournal?segmentTxId=1&jid=" + JID));
+    byte[] expected = Bytes.concat(
+            Ints.toByteArray(HdfsConstants.LAYOUT_VERSION),
+            EDITS_DATA);
+
+    assertArrayEquals(expected, retrievedViaHttp);
+    
+    // Attempt to fetch a non-existent file, check that we get an
+    // error status code
+    URL badUrl = new URL(urlRoot + "/getJournal?segmentTxId=12345&jid=" + JID);
+    HttpURLConnection connection = (HttpURLConnection)badUrl.openConnection();
+    try {
+      assertEquals(404, connection.getResponseCode());
+    } finally {
+      connection.disconnect();
+    }
+  }
+
+  /**
+   * Test that the JournalNode performs correctly as a Paxos
+   * <em>Acceptor</em> process.
+   */
+  @Test
+  public void testAcceptRecoveryBehavior() throws Exception {
+    // We need to run newEpoch() first, or else we have no way to distinguish
+    // different proposals for the same decision.
+    try {
+      ch.prepareRecovery(1L).get();
+      fail("Did not throw IllegalState when trying to run paxos without an epoch");
+    } catch (ExecutionException ise) {
+      GenericTestUtils.assertExceptionContains("bad epoch", ise);
+    }
+    
+    ch.newEpoch(1).get();
+    ch.setEpoch(1);
+    
+    // prepare() with no previously accepted value and no logs present
+    PrepareRecoveryResponseProto prep = ch.prepareRecovery(1L).get();
+    System.err.println("Prep: " + prep);
+    assertFalse(prep.hasAcceptedInEpoch());
+    assertFalse(prep.hasSegmentState());
+    
+    // Make a log segment, and prepare again -- this time should see the
+    // segment existing.
+    ch.startLogSegment(1L).get();
+    ch.sendEdits(1L, 1, QJMTestUtil.createTxnData(1, 1)).get();
+
+    prep = ch.prepareRecovery(1L).get();
+    System.err.println("Prep: " + prep);
+    assertFalse(prep.hasAcceptedInEpoch());
+    assertTrue(prep.hasSegmentState());
+    
+    // accept() should save the accepted value in persistent storage
+    // TODO: should be able to accept without a URL here
+    ch.acceptRecovery(prep.getSegmentState(), new URL("file:///dev/null")).get();
+
+    // So another prepare() call from a new epoch would return this value
+    ch.newEpoch(2);
+    ch.setEpoch(2);
+    prep = ch.prepareRecovery(1L).get();
+    assertEquals(1L, prep.getAcceptedInEpoch());
+    assertEquals(1L, prep.getSegmentState().getEndTxId());
+    
+    // A prepare() or accept() call from an earlier epoch should now be rejected
+    ch.setEpoch(1);
+    try {
+      ch.prepareRecovery(1L).get();
+      fail("prepare from earlier epoch not rejected");
+    } catch (ExecutionException ioe) {
+      GenericTestUtils.assertExceptionContains(
+          "epoch 1 is less than the last promised epoch 2",
+          ioe);
+    }
+    try {
+      ch.acceptRecovery(prep.getSegmentState(), new URL("file:///dev/null")).get();
+      fail("accept from earlier epoch not rejected");
+    } catch (ExecutionException ioe) {
+      GenericTestUtils.assertExceptionContains(
+          "epoch 1 is less than the last promised epoch 2",
+          ioe);
+    }
+  }
+  
+  // TODO:
+  // - add test that checks formatting behavior
+  // - add test that checks rejects newEpoch if nsinfo doesn't match
+  
+}

Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=1363596&r1=1363595&r2=1363596&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (original)
+++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java Fri Jul 20 00:25:50 2012
@@ -17,11 +17,16 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.spy;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -183,6 +188,15 @@ public class NameNodeAdapter {
     }
   }
   
+  public static FSEditLogOp createMkdirOp(String path) {
+    MkdirOp op = MkdirOp.getInstance(new FSEditLogOp.OpInstanceCache())
+      .setPath(path)
+      .setTimestamp(0)
+      .setPermissionStatus(new PermissionStatus(
+              "testuser", "testgroup", FsPermission.getDefault()));
+    return op;
+  }
+  
   /**
    * @return the number of blocks marked safe by safemode, or -1
    * if safemode is not running.



Mime
View raw message