hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1417596 [3/6] - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ src/main/bin/ src/main/java/org/apache/hadoop/hdfs/ src/main/java...
Date Wed, 05 Dec 2012 19:22:25 GMT
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
+import org.apache.hadoop.hdfs.server.namenode.GetImageServlet;
+import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode;
+import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ServletUtil;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * This servlet is used in two cases:
+ * <ul>
+ * <li>The QuorumJournalManager, when reading edits, fetches the edit streams
+ * from the journal nodes.</li>
+ * <li>During edits synchronization, one journal node will fetch edits from
+ * another journal node.</li>
+ * </ul>
+ */
+@InterfaceAudience.Private
+public class GetJournalEditServlet extends HttpServlet {
+
+  private static final long serialVersionUID = -4635891628211723009L;
+  private static final Log LOG = LogFactory.getLog(GetJournalEditServlet.class);
+
+  static final String STORAGEINFO_PARAM = "storageInfo";
+  static final String JOURNAL_ID_PARAM = "jid";
+  static final String SEGMENT_TXID_PARAM = "segmentTxId";
+
+  protected boolean isValidRequestor(HttpServletRequest request, Configuration conf)
+      throws IOException {
+    String remotePrincipal = request.getUserPrincipal().getName();
+    String remoteShortName = request.getRemoteUser();
+    if (remotePrincipal == null) { // This really shouldn't happen...
+      LOG.warn("Received null remoteUser while authorizing access to " +
+          "GetJournalEditServlet");
+      return false;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Validating request made by " + remotePrincipal +
+          " / " + remoteShortName + ". This user is: " +
+          UserGroupInformation.getLoginUser());
+    }
+
+    Set<String> validRequestors = new HashSet<String>();
+    validRequestors.addAll(DFSUtil.getAllNnPrincipals(conf));
+    validRequestors.add(
+        SecurityUtil.getServerPrincipal(conf
+            .get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY),
+            SecondaryNameNode.getHttpAddress(conf).getHostName()));
+
+    // Check the full principal name of all the configured valid requestors.
+    for (String v : validRequestors) {
+      if (LOG.isDebugEnabled())
+        LOG.debug("isValidRequestor is comparing to valid requestor: " + v);
+      if (v != null && v.equals(remotePrincipal)) {
+        if (LOG.isDebugEnabled())
+          LOG.debug("isValidRequestor is allowing: " + remotePrincipal);
+        return true;
+      }
+    }
+
+    // Additionally, we compare the short name of the requestor to this JN's
+    // username, because we want to allow requests from other JNs during
+    // recovery, but we can't enumerate the full list of JNs.
+    if (remoteShortName.equals(
+          UserGroupInformation.getLoginUser().getShortUserName())) {
+      if (LOG.isDebugEnabled())
+        LOG.debug("isValidRequestor is allowing other JN principal: " +
+            remotePrincipal);
+      return true;
+    }
+
+    if (LOG.isDebugEnabled())
+      LOG.debug("isValidRequestor is rejecting: " + remotePrincipal);
+    return false;
+  }
+  
+  private boolean checkRequestorOrSendError(Configuration conf,
+      HttpServletRequest request, HttpServletResponse response)
+          throws IOException {
+    if (UserGroupInformation.isSecurityEnabled()
+        && !isValidRequestor(request, conf)) {
+      response.sendError(HttpServletResponse.SC_FORBIDDEN,
+          "Only Namenode and another JournalNode may access this servlet");
+      LOG.warn("Received non-NN/JN request for edits from "
+          + request.getRemoteHost());
+      return false;
+    }
+    return true;
+  }
+  
+  private boolean checkStorageInfoOrSendError(JNStorage storage,
+      HttpServletRequest request, HttpServletResponse response)
+      throws IOException {
+    String myStorageInfoString = storage.toColonSeparatedString();
+    String theirStorageInfoString = request.getParameter(STORAGEINFO_PARAM);
+    
+    if (theirStorageInfoString != null
+        && !myStorageInfoString.equals(theirStorageInfoString)) {
+      String msg = "This node has storage info '" + myStorageInfoString
+          + "' but the requesting node expected '"
+          + theirStorageInfoString + "'";
+      
+      response.sendError(HttpServletResponse.SC_FORBIDDEN, msg);
+      LOG.warn("Received an invalid request file transfer request from " +
+          request.getRemoteAddr() + ": " + msg);
+      return false;
+    }
+    return true;
+  }
+  
+  @Override
+  public void doGet(final HttpServletRequest request,
+      final HttpServletResponse response) throws ServletException, IOException {
+    FileInputStream editFileIn = null;
+    try {
+      final ServletContext context = getServletContext();
+      final Configuration conf = (Configuration) getServletContext()
+          .getAttribute(JspHelper.CURRENT_CONF);
+      final String journalId = request.getParameter(JOURNAL_ID_PARAM);
+      QuorumJournalManager.checkJournalId(journalId);
+      final JNStorage storage = JournalNodeHttpServer
+          .getJournalFromContext(context, journalId).getStorage();
+
+      // Check security
+      if (!checkRequestorOrSendError(conf, request, response)) {
+        return;
+      }
+
+      // Check that the namespace info is correct
+      if (!checkStorageInfoOrSendError(storage, request, response)) {
+        return;
+      }
+      
+      long segmentTxId = ServletUtil.parseLongParam(request,
+          SEGMENT_TXID_PARAM);
+
+      FileJournalManager fjm = storage.getJournalManager();
+      File editFile;
+
+      synchronized (fjm) {
+        // Synchronize on the FJM so that the file doesn't get finalized
+        // out from underneath us while we're in the process of opening
+        // it up.
+        EditLogFile elf = fjm.getLogFile(
+            segmentTxId);
+        if (elf == null) {
+          response.sendError(HttpServletResponse.SC_NOT_FOUND,
+              "No edit log found starting at txid " + segmentTxId);
+          return;
+        }
+        editFile = elf.getFile();
+        GetImageServlet.setVerificationHeaders(response, editFile);
+        GetImageServlet.setFileNameHeaders(response, editFile);
+        editFileIn = new FileInputStream(editFile);
+      }
+      
+      DataTransferThrottler throttler = GetImageServlet.getThrottler(conf);
+
+      // send edits
+      TransferFsImage.getFileServer(response, editFile, editFileIn, throttler);
+
+    } catch (Throwable t) {
+      String errMsg = "getedit failed. " + StringUtils.stringifyException(t);
+      response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, errMsg);
+      throw new IOException(errMsg);
+    } finally {
+      IOUtils.closeStream(editFileIn);
+    }
+  }
+
+  public static String buildPath(String journalId, long segmentTxId,
+      NamespaceInfo nsInfo) {
+    StringBuilder path = new StringBuilder("/getJournal?");
+    try {
+      path.append(JOURNAL_ID_PARAM).append("=")
+          .append(URLEncoder.encode(journalId, "UTF-8"));
+      path.append("&" + SEGMENT_TXID_PARAM).append("=")
+          .append(segmentTxId);
+      path.append("&" + STORAGEINFO_PARAM).append("=")
+          .append(URLEncoder.encode(nsInfo.toColonSeparatedString(), "UTF-8"));
+    } catch (UnsupportedEncodingException e) {
+      // Never get here -- everyone supports UTF-8
+      throw new RuntimeException(e);
+    }
+    return path.toString();
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
+import org.apache.hadoop.hdfs.server.namenode.NNStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * A {@link Storage} implementation for the {@link JournalNode}.
+ * 
+ * The JN has a storage directory for each namespace for which it stores
+ * metadata. There is only a single directory per JN in the current design.
+ */
+class JNStorage extends Storage {
+
+  private final FileJournalManager fjm;
+  private final StorageDirectory sd;
+  private StorageState state;
+  
+
+  private static final List<Pattern> CURRENT_DIR_PURGE_REGEXES =
+      ImmutableList.of(
+        Pattern.compile("edits_\\d+-(\\d+)"),
+        Pattern.compile("edits_inprogress_(\\d+)(?:\\..*)?"));
+  
+  private static final List<Pattern> PAXOS_DIR_PURGE_REGEXES = 
+      ImmutableList.of(Pattern.compile("(\\d+)"));
+
+  /**
+   * @param logDir the path to the directory in which data will be stored
+   * @param errorReporter a callback to report errors
+   * @throws IOException 
+   */
+  protected JNStorage(File logDir, StorageErrorReporter errorReporter) throws IOException {
+    super(NodeType.JOURNAL_NODE);
+    
+    sd = new StorageDirectory(logDir);
+    this.addStorageDir(sd);
+    this.fjm = new FileJournalManager(sd, errorReporter);
+    
+    analyzeStorage();
+  }
+  
+  FileJournalManager getJournalManager() {
+    return fjm;
+  }
+
+  @Override
+  public boolean isPreUpgradableLayout(StorageDirectory sd) throws IOException {
+    return false;
+  }
+
+  /**
+   * Find an edits file spanning the given transaction ID range.
+   * If no such file exists, an exception is thrown.
+   */
+  File findFinalizedEditsFile(long startTxId, long endTxId) throws IOException {
+    File ret = new File(sd.getCurrentDir(),
+        NNStorage.getFinalizedEditsFileName(startTxId, endTxId));
+    if (!ret.exists()) {
+      throw new IOException(
+          "No edits file for range " + startTxId + "-" + endTxId);
+    }
+    return ret;
+  }
+
+  /**
+   * @return the path for an in-progress edits file starting at the given
+   * transaction ID. This does not verify existence of the file. 
+   */
+  File getInProgressEditLog(long startTxId) {
+    return new File(sd.getCurrentDir(),
+        NNStorage.getInProgressEditsFileName(startTxId));
+  }
+  
+  /**
+   * @param segmentTxId the first txid of the segment
+   * @param epoch the epoch number of the writer which is coordinating
+   * recovery
+   * @return the temporary path in which an edits log should be stored
+   * while it is being downloaded from a remote JournalNode
+   */
+  File getSyncLogTemporaryFile(long segmentTxId, long epoch) {
+    String name = NNStorage.getInProgressEditsFileName(segmentTxId) +
+        ".epoch=" + epoch; 
+    return new File(sd.getCurrentDir(), name);
+  }
+
+  /**
+   * @return the path for the file which contains persisted data for the
+   * paxos-like recovery process for the given log segment.
+   */
+  File getPaxosFile(long segmentTxId) {
+    return new File(getPaxosDir(), String.valueOf(segmentTxId));
+  }
+  
+  File getPaxosDir() {
+    return new File(sd.getCurrentDir(), "paxos");
+  }
+  
+  /**
+   * Remove any log files and associated paxos files which are older than
+   * the given txid.
+   */
+  void purgeDataOlderThan(long minTxIdToKeep) throws IOException {
+    purgeMatching(sd.getCurrentDir(),
+        CURRENT_DIR_PURGE_REGEXES, minTxIdToKeep);
+    purgeMatching(getPaxosDir(), PAXOS_DIR_PURGE_REGEXES, minTxIdToKeep);
+  }
+  
+  /**
+   * Purge files in the given directory which match any of the set of patterns.
+   * The patterns must have a single numeric capture group which determines
+   * the associated transaction ID of the file. Only those files for which
+   * the transaction ID is less than the <code>minTxIdToKeep</code> parameter
+   * are removed.
+   */
+  private static void purgeMatching(File dir, List<Pattern> patterns,
+      long minTxIdToKeep) throws IOException {
+
+    for (File f : FileUtil.listFiles(dir)) {
+      if (!f.isFile()) continue;
+      
+      for (Pattern p : patterns) {
+        Matcher matcher = p.matcher(f.getName());
+        if (matcher.matches()) {
+          // This parsing will always succeed since the group(1) is
+          // /\d+/ in the regex itself.
+          long txid = Long.valueOf(matcher.group(1));
+          if (txid < minTxIdToKeep) {
+            LOG.info("Purging no-longer needed file " + txid);
+            if (!f.delete()) {
+              LOG.warn("Unable to delete no-longer-needed data " +
+                  f);
+            }
+            break;
+          }
+        }
+      }
+    }
+  }
+
+  void format(NamespaceInfo nsInfo) throws IOException {
+    setStorageInfo(nsInfo);
+    LOG.info("Formatting journal storage directory " + 
+        sd + " with nsid: " + getNamespaceID());
+    // Unlock the directory before formatting, because we will
+    // re-analyze it after format(). The analyzeStorage() call
+    // below is reponsible for re-locking it. This is a no-op
+    // if the storage is not currently locked.
+    unlockAll();
+    sd.clearDirectory();
+    writeProperties(sd);
+    if (!getPaxosDir().mkdirs()) {
+      throw new IOException("Could not create paxos dir: " + getPaxosDir());
+    }
+    analyzeStorage();
+  }
+
+  
+  void analyzeStorage() throws IOException {
+    this.state = sd.analyzeStorage(StartupOption.REGULAR, this);
+    if (state == StorageState.NORMAL) {
+      readProperties(sd);
+    }
+  }
+
+  void checkConsistentNamespace(NamespaceInfo nsInfo)
+      throws IOException {
+    if (nsInfo.getNamespaceID() != getNamespaceID()) {
+      throw new IOException("Incompatible namespaceID for journal " +
+          this.sd + ": NameNode has nsId " + nsInfo.getNamespaceID() +
+          " but storage has nsId " + getNamespaceID());
+    }
+    
+    if (!nsInfo.getClusterID().equals(getClusterID())) {
+      throw new IOException("Incompatible clusterID for journal " +
+          this.sd + ": NameNode has clusterId '" + nsInfo.getClusterID() +
+          "' but storage has clusterId '" + getClusterID() + "'");
+      
+    }
+  }
+
+  public void close() throws IOException {
+    LOG.info("Closing journal storage for " + sd);
+    unlockAll();
+  }
+
+  public boolean isFormatted() {
+    return state == StorageState.NORMAL;
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,953 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.net.URL;
+import java.security.PrivilegedExceptionAction;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
+import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
+import org.apache.hadoop.hdfs.server.namenode.JournalManager;
+import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
+import org.apache.hadoop.hdfs.util.BestEffortLongFile;
+import org.apache.hadoop.hdfs.util.PersistentLongFile;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.SecurityUtil;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Range;
+import com.google.common.collect.Ranges;
+import com.google.protobuf.TextFormat;
+
+/**
+ * A JournalNode can manage journals for several clusters at once.
+ * Each such journal is entirely independent despite being hosted by
+ * the same JVM.
+ */
+class Journal implements Closeable {
+  static final Log LOG = LogFactory.getLog(Journal.class);
+
+
+  // Current writing state
+  private EditLogOutputStream curSegment;
+  private long curSegmentTxId = HdfsConstants.INVALID_TXID;
+  private long nextTxId = HdfsConstants.INVALID_TXID;
+  private long highestWrittenTxId = 0;
+  
+  private final String journalId;
+  
+  private final JNStorage storage;
+
+  /**
+   * When a new writer comes along, it asks each node to promise
+   * to ignore requests from any previous writer, as identified
+   * by epoch number. In order to make such a promise, the epoch
+   * number of that writer is stored persistently on disk.
+   */
+  private PersistentLongFile lastPromisedEpoch;
+
+  /**
+   * Each IPC that comes from a given client contains a serial number
+   * which only increases from the client's perspective. Whenever
+   * we switch epochs, we reset this back to -1. Whenever an IPC
+   * comes from a client, we ensure that it is strictly higher
+   * than any previous IPC. This guards against any bugs in the IPC
+   * layer that would re-order IPCs or cause a stale retry from an old
+   * request to resurface and confuse things.
+   */
+  private long currentEpochIpcSerial = -1;
+  
+  /**
+   * The epoch number of the last writer to actually write a transaction.
+   * This is used to differentiate log segments after a crash at the very
+   * beginning of a segment. See the the 'testNewerVersionOfSegmentWins'
+   * test case.
+   */
+  private PersistentLongFile lastWriterEpoch;
+  
+  /**
+   * Lower-bound on the last committed transaction ID. This is not
+   * depended upon for correctness, but acts as a sanity check
+   * during the recovery procedures, and as a visibility mark
+   * for clients reading in-progress logs.
+   */
+  private BestEffortLongFile committedTxnId;
+  
+  private static final String LAST_PROMISED_FILENAME = "last-promised-epoch";
+  private static final String LAST_WRITER_EPOCH = "last-writer-epoch";
+  private static final String COMMITTED_TXID_FILENAME = "committed-txid";
+  
+  private final FileJournalManager fjm;
+
+  private final JournalMetrics metrics;
+
+
+  Journal(File logDir, String journalId,
+      StorageErrorReporter errorReporter) throws IOException {
+    storage = new JNStorage(logDir, errorReporter);
+    this.journalId = journalId;
+
+    refreshCachedData();
+    
+    this.fjm = storage.getJournalManager();
+    
+    this.metrics = JournalMetrics.create(this);
+    
+    EditLogFile latest = scanStorageForLatestEdits();
+    if (latest != null) {
+      highestWrittenTxId = latest.getLastTxId();
+    }
+  }
+
+  /**
+   * Reload any data that may have been cached. This is necessary
+   * when we first load the Journal, but also after any formatting
+   * operation, since the cached data is no longer relevant.
+   */
+  private synchronized void refreshCachedData() {
+    IOUtils.closeStream(committedTxnId);
+    
+    File currentDir = storage.getSingularStorageDir().getCurrentDir();
+    this.lastPromisedEpoch = new PersistentLongFile(
+        new File(currentDir, LAST_PROMISED_FILENAME), 0);
+    this.lastWriterEpoch = new PersistentLongFile(
+        new File(currentDir, LAST_WRITER_EPOCH), 0);
+    this.committedTxnId = new BestEffortLongFile(
+        new File(currentDir, COMMITTED_TXID_FILENAME),
+        HdfsConstants.INVALID_TXID);
+  }
+  
+  /**
+   * Scan the local storage directory, and return the segment containing
+   * the highest transaction.
+   * @return the EditLogFile with the highest transactions, or null
+   * if no files exist.
+   */
+  private synchronized EditLogFile scanStorageForLatestEdits() throws IOException {
+    if (!fjm.getStorageDirectory().getCurrentDir().exists()) {
+      return null;
+    }
+    
+    LOG.info("Scanning storage " + fjm);
+    List<EditLogFile> files = fjm.getLogFiles(0);
+    
+    while (!files.isEmpty()) {
+      EditLogFile latestLog = files.remove(files.size() - 1);
+      latestLog.validateLog();
+      LOG.info("Latest log is " + latestLog);
+      if (latestLog.getLastTxId() == HdfsConstants.INVALID_TXID) {
+        // the log contains no transactions
+        LOG.warn("Latest log " + latestLog + " has no transactions. " +
+            "moving it aside and looking for previous log");
+        latestLog.moveAsideEmptyFile();
+      } else {
+        return latestLog;
+      }
+    }
+    
+    LOG.info("No files in " + fjm);
+    return null;
+  }
+
+  /**
+   * Format the local storage with the given namespace.
+   */
+  void format(NamespaceInfo nsInfo) throws IOException {
+    Preconditions.checkState(nsInfo.getNamespaceID() != 0,
+        "can't format with uninitialized namespace info: %s",
+        nsInfo);
+    LOG.info("Formatting " + this + " with namespace info: " +
+        nsInfo);
+    storage.format(nsInfo);
+    refreshCachedData();
+  }
+
+  /**
+   * Unlock and release resources.
+   */
+  @Override // Closeable
+  public void close() throws IOException {
+    storage.close();
+    
+    IOUtils.closeStream(committedTxnId);
+  }
+  
+  JNStorage getStorage() {
+    return storage;
+  }
+  
+  String getJournalId() {
+    return journalId;
+  }
+
+  /**
+   * @return the last epoch which this node has promised not to accept
+   * any lower epoch, or 0 if no promises have been made.
+   */
+  synchronized long getLastPromisedEpoch() throws IOException {
+    checkFormatted();
+    return lastPromisedEpoch.get();
+  }
+
+  synchronized public long getLastWriterEpoch() throws IOException {
+    checkFormatted();
+    return lastWriterEpoch.get();
+  }
+  
+  synchronized long getCommittedTxnIdForTests() throws IOException {
+    return committedTxnId.get();
+  }
+  
+  synchronized long getCurrentLagTxns() throws IOException {
+    long committed = committedTxnId.get();
+    if (committed == 0) {
+      return 0;
+    }
+    
+    return Math.max(committed - highestWrittenTxId, 0L);
+  }
+  
+  synchronized long getHighestWrittenTxId() {
+    return highestWrittenTxId;
+  }
+  
+  @VisibleForTesting
+  JournalMetrics getMetricsForTests() {
+    return metrics;
+  }
+
+  /**
+   * Try to create a new epoch for this journal.
+   * @param nsInfo the namespace, which is verified for consistency or used to
+   * format, if the Journal has not yet been written to.
+   * @param epoch the epoch to start
+   * @return the status information necessary to begin recovery
+   * @throws IOException if the node has already made a promise to another
+   * writer with a higher epoch number, if the namespace is inconsistent,
+   * or if a disk error occurs.
+   */
+  synchronized NewEpochResponseProto newEpoch(
+      NamespaceInfo nsInfo, long epoch) throws IOException {
+
+    checkFormatted();
+    storage.checkConsistentNamespace(nsInfo);
+
+    // Check that the new epoch being proposed is in fact newer than
+    // any other that we've promised. 
+    if (epoch <= getLastPromisedEpoch()) {
+      throw new IOException("Proposed epoch " + epoch + " <= last promise " +
+          getLastPromisedEpoch());
+    }
+    
+    updateLastPromisedEpoch(epoch);
+    abortCurSegment();
+    
+    NewEpochResponseProto.Builder builder =
+        NewEpochResponseProto.newBuilder();
+
+    EditLogFile latestFile = scanStorageForLatestEdits();
+
+    if (latestFile != null) {
+      builder.setLastSegmentTxId(latestFile.getFirstTxId());
+    }
+    
+    return builder.build();
+  }
+
+  private void updateLastPromisedEpoch(long newEpoch) throws IOException {
+    LOG.info("Updating lastPromisedEpoch from " + lastPromisedEpoch.get() +
+        " to " + newEpoch + " for client " + Server.getRemoteIp());
+    lastPromisedEpoch.set(newEpoch);
+    
+    // Since we have a new writer, reset the IPC serial - it will start
+    // counting again from 0 for this writer.
+    currentEpochIpcSerial = -1;
+  }
+
+  private void abortCurSegment() throws IOException {
+    if (curSegment == null) {
+      return;
+    }
+    
+    curSegment.abort();
+    curSegment = null;
+    curSegmentTxId = HdfsConstants.INVALID_TXID;
+  }
+
+  /**
+   * Write a batch of edits to the journal.
+   * {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])}
+   */
+  synchronized void journal(RequestInfo reqInfo,
+      long segmentTxId, long firstTxnId,
+      int numTxns, byte[] records) throws IOException {
+    checkFormatted();
+    checkWriteRequest(reqInfo);
+
+    checkSync(curSegment != null,
+        "Can't write, no segment open");
+    
+    if (curSegmentTxId != segmentTxId) {
+      // Sanity check: it is possible that the writer will fail IPCs
+      // on both the finalize() and then the start() of the next segment.
+      // This could cause us to continue writing to an old segment
+      // instead of rolling to a new one, which breaks one of the
+      // invariants in the design. If it happens, abort the segment
+      // and throw an exception.
+      JournalOutOfSyncException e = new JournalOutOfSyncException(
+          "Writer out of sync: it thinks it is writing segment " + segmentTxId
+          + " but current segment is " + curSegmentTxId);
+      abortCurSegment();
+      throw e;
+    }
+      
+    checkSync(nextTxId == firstTxnId,
+        "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId);
+    
+    long lastTxnId = firstTxnId + numTxns - 1;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId);
+    }
+
+    // If the edit has already been marked as committed, we know
+    // it has been fsynced on a quorum of other nodes, and we are
+    // "catching up" with the rest. Hence we do not need to fsync.
+    boolean isLagging = lastTxnId <= committedTxnId.get();
+    boolean shouldFsync = !isLagging;
+    
+    curSegment.writeRaw(records, 0, records.length);
+    curSegment.setReadyToFlush();
+    Stopwatch sw = new Stopwatch();
+    sw.start();
+    curSegment.flush(shouldFsync);
+    sw.stop();
+    
+    metrics.addSync(sw.elapsedTime(TimeUnit.MICROSECONDS));
+
+    if (isLagging) {
+      // This batch of edits has already been committed on a quorum of other
+      // nodes. So, we are in "catch up" mode. This gets its own metric.
+      metrics.batchesWrittenWhileLagging.incr(1);
+    }
+    
+    metrics.batchesWritten.incr(1);
+    metrics.bytesWritten.incr(records.length);
+    metrics.txnsWritten.incr(numTxns);
+    
+    highestWrittenTxId = lastTxnId;
+    nextTxId = lastTxnId + 1;
+  }
+
+  public void heartbeat(RequestInfo reqInfo) throws IOException {
+    checkRequest(reqInfo);
+  }
+  
+  /**
+   * Ensure that the given request is coming from the correct writer and in-order.
+   * @param reqInfo the request info
+   * @throws IOException if the request is invalid.
+   */
+  private synchronized void checkRequest(RequestInfo reqInfo) throws IOException {
+    // Invariant 25 from ZAB paper
+    if (reqInfo.getEpoch() < lastPromisedEpoch.get()) {
+      throw new IOException("IPC's epoch " + reqInfo.getEpoch() +
+          " is less than the last promised epoch " +
+          lastPromisedEpoch.get());
+    } else if (reqInfo.getEpoch() > lastPromisedEpoch.get()) {
+      // A newer client has arrived. Fence any previous writers by updating
+      // the promise.
+      updateLastPromisedEpoch(reqInfo.getEpoch());
+    }
+    
+    // Ensure that the IPCs are arriving in-order as expected.
+    checkSync(reqInfo.getIpcSerialNumber() > currentEpochIpcSerial,
+        "IPC serial %s from client %s was not higher than prior highest " +
+        "IPC serial %s", reqInfo.getIpcSerialNumber(),
+        Server.getRemoteIp(),
+        currentEpochIpcSerial);
+    currentEpochIpcSerial = reqInfo.getIpcSerialNumber();
+
+    if (reqInfo.hasCommittedTxId()) {
+      Preconditions.checkArgument(
+          reqInfo.getCommittedTxId() >= committedTxnId.get(),
+          "Client trying to move committed txid backward from " +
+          committedTxnId.get() + " to " + reqInfo.getCommittedTxId());
+      
+      committedTxnId.set(reqInfo.getCommittedTxId());
+    }
+  }
+  
+  private synchronized void checkWriteRequest(RequestInfo reqInfo) throws IOException {
+    checkRequest(reqInfo);
+    
+    if (reqInfo.getEpoch() != lastWriterEpoch.get()) {
+      throw new IOException("IPC's epoch " + reqInfo.getEpoch() +
+          " is not the current writer epoch  " +
+          lastWriterEpoch.get());
+    }
+  }
+  
+  public synchronized boolean isFormatted() {
+    return storage.isFormatted();
+  }
+
+  private void checkFormatted() throws JournalNotFormattedException {
+    if (!isFormatted()) {
+      throw new JournalNotFormattedException("Journal " +
+          storage.getSingularStorageDir() + " not formatted");
+    }
+  }
+
+  /**
+   * @throws JournalOutOfSyncException if the given expression is not true.
+   * The message of the exception is formatted using the 'msg' and
+   * 'formatArgs' parameters.
+   */
+  private void checkSync(boolean expression, String msg,
+      Object... formatArgs) throws JournalOutOfSyncException {
+    if (!expression) {
+      throw new JournalOutOfSyncException(String.format(msg, formatArgs));
+    }
+  }
+
+  /**
+   * @throws AssertionError if the given expression is not true.
+   * The message of the exception is formatted using the 'msg' and
+   * 'formatArgs' parameters.
+   * 
+   * This should be used in preference to Java's built-in assert in
+   * non-performance-critical paths, where a failure of this invariant
+   * might cause the protocol to lose data. 
+   */
+  private void alwaysAssert(boolean expression, String msg,
+      Object... formatArgs) {
+    if (!expression) {
+      throw new AssertionError(String.format(msg, formatArgs));
+    }
+  }
+  
+  /**
+   * Start a new segment at the given txid. The previous segment
+   * must have already been finalized.
+   */
+  public synchronized void startLogSegment(RequestInfo reqInfo, long txid)
+      throws IOException {
+    assert fjm != null;
+    checkFormatted();
+    checkRequest(reqInfo);
+    
+    if (curSegment != null) {
+      LOG.warn("Client is requesting a new log segment " + txid + 
+          " though we are already writing " + curSegment + ". " +
+          "Aborting the current segment in order to begin the new one.");
+      // The writer may have lost a connection to us and is now
+      // re-connecting after the connection came back.
+      // We should abort our own old segment.
+      abortCurSegment();
+    }
+
+    // Paranoid sanity check: we should never overwrite a finalized log file.
+    // Additionally, if it's in-progress, it should have at most 1 transaction.
+    // This can happen if the writer crashes exactly at the start of a segment.
+    EditLogFile existing = fjm.getLogFile(txid);
+    if (existing != null) {
+      if (!existing.isInProgress()) {
+        throw new IllegalStateException("Already have a finalized segment " +
+            existing + " beginning at " + txid);
+      }
+      
+      // If it's in-progress, it should only contain one transaction,
+      // because the "startLogSegment" transaction is written alone at the
+      // start of each segment. 
+      existing.validateLog();
+      if (existing.getLastTxId() != existing.getFirstTxId()) {
+        throw new IllegalStateException("The log file " +
+            existing + " seems to contain valid transactions");
+      }
+    }
+    
+    long curLastWriterEpoch = lastWriterEpoch.get();
+    if (curLastWriterEpoch != reqInfo.getEpoch()) {
+      LOG.info("Updating lastWriterEpoch from " + curLastWriterEpoch +
+          " to " + reqInfo.getEpoch() + " for client " +
+          Server.getRemoteIp());
+      lastWriterEpoch.set(reqInfo.getEpoch());
+    }
+
+    // The fact that we are starting a segment at this txid indicates
+    // that any previous recovery for this same segment was aborted.
+    // Otherwise, no writer would have started writing. So, we can
+    // remove the record of the older segment here.
+    purgePaxosDecision(txid);
+    
+    curSegment = fjm.startLogSegment(txid);
+    curSegmentTxId = txid;
+    nextTxId = txid;
+  }
+  
+  /**
+   * Finalize the log segment at the given transaction ID.
+   */
+  public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
+      long endTxId) throws IOException {
+    checkFormatted();
+    checkRequest(reqInfo);
+
+    boolean needsValidation = true;
+
+    // Finalizing the log that the writer was just writing.
+    if (startTxId == curSegmentTxId) {
+      if (curSegment != null) {
+        curSegment.close();
+        curSegment = null;
+        curSegmentTxId = HdfsConstants.INVALID_TXID;
+      }
+      
+      checkSync(nextTxId == endTxId + 1,
+          "Trying to finalize in-progress log segment %s to end at " +
+          "txid %s but only written up to txid %s",
+          startTxId, endTxId, nextTxId - 1);
+      // No need to validate the edit log if the client is finalizing
+      // the log segment that it was just writing to.
+      needsValidation = false;
+    }
+    
+    FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId);
+    if (elf == null) {
+      throw new JournalOutOfSyncException("No log file to finalize at " +
+          "transaction ID " + startTxId);
+    }
+
+    if (elf.isInProgress()) {
+      if (needsValidation) {
+        LOG.info("Validating log segment " + elf.getFile() + " about to be " +
+            "finalized");
+        elf.validateLog();
+  
+        checkSync(elf.getLastTxId() == endTxId,
+            "Trying to finalize in-progress log segment %s to end at " +
+            "txid %s but log %s on disk only contains up to txid %s",
+            startTxId, endTxId, elf.getFile(), elf.getLastTxId());
+      }
+      fjm.finalizeLogSegment(startTxId, endTxId);
+    } else {
+      Preconditions.checkArgument(endTxId == elf.getLastTxId(),
+          "Trying to re-finalize already finalized log " +
+              elf + " with different endTxId " + endTxId);
+    }
+
+    // Once logs are finalized, a different length will never be decided.
+    // During recovery, we treat a finalized segment the same as an accepted
+    // recovery. Thus, we no longer need to keep track of the previously-
+    // accepted decision. The existence of the finalized log segment is enough.
+    purgePaxosDecision(elf.getFirstTxId());
+  }
+  
+  /**
+   * @see JournalManager#purgeLogsOlderThan(long)
+   */
+  public synchronized void purgeLogsOlderThan(RequestInfo reqInfo,
+      long minTxIdToKeep) throws IOException {
+    checkFormatted();
+    checkRequest(reqInfo);
+    
+    storage.purgeDataOlderThan(minTxIdToKeep);
+  }
+  
+  /**
+   * Remove the previously-recorded 'accepted recovery' information
+   * for a given log segment, once it is no longer necessary. 
+   * @param segmentTxId the transaction ID to purge
+   * @throws IOException if the file could not be deleted
+   */
+  private void purgePaxosDecision(long segmentTxId) throws IOException {
+    File paxosFile = storage.getPaxosFile(segmentTxId);
+    if (paxosFile.exists()) {
+      if (!paxosFile.delete()) {
+        throw new IOException("Unable to delete paxos file " + paxosFile);
+      }
+    }
+  }
+
+  /**
+   * @see QJournalProtocol#getEditLogManifest(String, long)
+   */
+  public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
+      throws IOException {
+    // No need to checkRequest() here - anyone may ask for the list
+    // of segments.
+    checkFormatted();
+    
+    RemoteEditLogManifest manifest = new RemoteEditLogManifest(
+        fjm.getRemoteEditLogs(sinceTxId));
+    return manifest;
+  }
+
+  /**
+   * @return the current state of the given segment, or null if the
+   * segment does not exist.
+   */
+  private SegmentStateProto getSegmentInfo(long segmentTxId)
+      throws IOException {
+    EditLogFile elf = fjm.getLogFile(segmentTxId);
+    if (elf == null) {
+      return null;
+    }
+    if (elf.isInProgress()) {
+      elf.validateLog();
+    }
+    if (elf.getLastTxId() == HdfsConstants.INVALID_TXID) {
+      LOG.info("Edit log file " + elf + " appears to be empty. " +
+          "Moving it aside...");
+      elf.moveAsideEmptyFile();
+      return null;
+    }
+    SegmentStateProto ret = SegmentStateProto.newBuilder()
+        .setStartTxId(segmentTxId)
+        .setEndTxId(elf.getLastTxId())
+        .setIsInProgress(elf.isInProgress())
+        .build();
+    LOG.info("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " +
+        TextFormat.shortDebugString(ret));
+    return ret;
+  }
+
+  /**
+   * @see QJournalProtocol#prepareRecovery(RequestInfo, long)
+   */
+  public synchronized PrepareRecoveryResponseProto prepareRecovery(
+      RequestInfo reqInfo, long segmentTxId) throws IOException {
+    checkFormatted();
+    checkRequest(reqInfo);
+    
+    abortCurSegment();
+    
+    PrepareRecoveryResponseProto.Builder builder =
+        PrepareRecoveryResponseProto.newBuilder();
+
+    PersistedRecoveryPaxosData previouslyAccepted = getPersistedPaxosData(segmentTxId);
+    completeHalfDoneAcceptRecovery(previouslyAccepted);
+
+    SegmentStateProto segInfo = getSegmentInfo(segmentTxId);
+    boolean hasFinalizedSegment = segInfo != null && !segInfo.getIsInProgress();
+
+    if (previouslyAccepted != null && !hasFinalizedSegment) {
+      SegmentStateProto acceptedState = previouslyAccepted.getSegmentState();
+      assert acceptedState.getEndTxId() == segInfo.getEndTxId() :
+            "prev accepted: " + TextFormat.shortDebugString(previouslyAccepted)+ "\n" +
+            "on disk:       " + TextFormat.shortDebugString(segInfo);
+            
+      builder.setAcceptedInEpoch(previouslyAccepted.getAcceptedInEpoch())
+        .setSegmentState(previouslyAccepted.getSegmentState());
+    } else {
+      if (segInfo != null) {
+        builder.setSegmentState(segInfo);
+      }
+    }
+    
+    builder.setLastWriterEpoch(lastWriterEpoch.get());
+    if (committedTxnId.get() != HdfsConstants.INVALID_TXID) {
+      builder.setLastCommittedTxId(committedTxnId.get());
+    }
+    
+    PrepareRecoveryResponseProto resp = builder.build();
+    LOG.info("Prepared recovery for segment " + segmentTxId + ": " +
+        TextFormat.shortDebugString(resp));
+    return resp;
+  }
+  
+  /**
+   * @see QJournalProtocol#acceptRecovery(RequestInfo, SegmentStateProto, URL)
+   */
+  public synchronized void acceptRecovery(RequestInfo reqInfo,
+      SegmentStateProto segment, URL fromUrl)
+      throws IOException {
+    checkFormatted();
+    checkRequest(reqInfo);
+    
+    abortCurSegment();
+
+    long segmentTxId = segment.getStartTxId();
+
+    // Basic sanity checks that the segment is well-formed and contains
+    // at least one transaction.
+    Preconditions.checkArgument(segment.getEndTxId() > 0 &&
+        segment.getEndTxId() >= segmentTxId,
+        "bad recovery state for segment %s: %s",
+        segmentTxId, TextFormat.shortDebugString(segment));
+    
+    PersistedRecoveryPaxosData oldData = getPersistedPaxosData(segmentTxId);
+    PersistedRecoveryPaxosData newData = PersistedRecoveryPaxosData.newBuilder()
+        .setAcceptedInEpoch(reqInfo.getEpoch())
+        .setSegmentState(segment)
+        .build();
+    
+    // If we previously acted on acceptRecovery() from a higher-numbered writer,
+    // this call is out of sync. We should never actually trigger this, since the
+    // checkRequest() call above should filter non-increasing epoch numbers.
+    if (oldData != null) {
+      alwaysAssert(oldData.getAcceptedInEpoch() <= reqInfo.getEpoch(),
+          "Bad paxos transition, out-of-order epochs.\nOld: %s\nNew: %s\n",
+          oldData, newData);
+    }
+    
+    File syncedFile = null;
+    
+    SegmentStateProto currentSegment = getSegmentInfo(segmentTxId);
+    if (currentSegment == null ||
+        currentSegment.getEndTxId() != segment.getEndTxId()) {
+      if (currentSegment == null) {
+        LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
+            ": no current segment in place");
+        
+        // Update the highest txid for lag metrics
+        highestWrittenTxId = Math.max(segment.getEndTxId(),
+            highestWrittenTxId);
+      } else {
+        LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) +
+            ": old segment " + TextFormat.shortDebugString(currentSegment) +
+            " is not the right length");
+        
+        // Paranoid sanity check: if the new log is shorter than the log we
+        // currently have, we should not end up discarding any transactions
+        // which are already Committed.
+        if (txnRange(currentSegment).contains(committedTxnId.get()) &&
+            !txnRange(segment).contains(committedTxnId.get())) {
+          throw new AssertionError(
+              "Cannot replace segment " +
+              TextFormat.shortDebugString(currentSegment) +
+              " with new segment " +
+              TextFormat.shortDebugString(segment) + 
+              ": would discard already-committed txn " +
+              committedTxnId.get());
+        }
+        
+        // Another paranoid check: we should not be asked to synchronize a log
+        // on top of a finalized segment.
+        alwaysAssert(currentSegment.getIsInProgress(),
+            "Should never be asked to synchronize a different log on top of an " +
+            "already-finalized segment");
+        
+        // If we're shortening the log, update our highest txid
+        // used for lag metrics.
+        if (txnRange(currentSegment).contains(highestWrittenTxId)) {
+          highestWrittenTxId = segment.getEndTxId();
+        }
+      }
+      syncedFile = syncLog(reqInfo, segment, fromUrl);
+      
+    } else {
+      LOG.info("Skipping download of log " +
+          TextFormat.shortDebugString(segment) +
+          ": already have up-to-date logs");
+    }
+    
+    // This is one of the few places in the protocol where we have a single
+    // RPC that results in two distinct actions:
+    //
+    // - 1) Downloads the new log segment data (above)
+    // - 2) Records the new Paxos data about the synchronized segment (below)
+    //
+    // These need to be treated as a transaction from the perspective
+    // of any external process. We do this by treating the persistPaxosData()
+    // success as the "commit" of an atomic transaction. If we fail before
+    // this point, the downloaded edit log will only exist at a temporary
+    // path, and thus not change any externally visible state. If we fail
+    // after this point, then any future prepareRecovery() call will see
+    // the Paxos data, and by calling completeHalfDoneAcceptRecovery() will
+    // roll forward the rename of the referenced log file.
+    //
+    // See also: HDFS-3955
+    //
+    // The fault points here are exercised by the randomized fault injection
+    // test case to ensure that this atomic "transaction" operates correctly.
+    JournalFaultInjector.get().beforePersistPaxosData();
+    persistPaxosData(segmentTxId, newData);
+    JournalFaultInjector.get().afterPersistPaxosData();
+
+    if (syncedFile != null) {
+      FileUtil.replaceFile(syncedFile,
+          storage.getInProgressEditLog(segmentTxId));
+    }
+
+    LOG.info("Accepted recovery for segment " + segmentTxId + ": " +
+        TextFormat.shortDebugString(newData));
+  }
+
+  private Range<Long> txnRange(SegmentStateProto seg) {
+    Preconditions.checkArgument(seg.hasEndTxId(),
+        "invalid segment: %s", seg);
+    return Ranges.closed(seg.getStartTxId(), seg.getEndTxId());
+  }
+
+  /**
+   * Synchronize a log segment from another JournalNode. The log is
+   * downloaded from the provided URL into a temporary location on disk,
+   * which is named based on the current request's epoch.
+   *
+   * @return the temporary location of the downloaded file
+   */
+  private File syncLog(RequestInfo reqInfo,
+      final SegmentStateProto segment, final URL url) throws IOException {
+    final File tmpFile = storage.getSyncLogTemporaryFile(
+        segment.getStartTxId(), reqInfo.getEpoch());
+    final List<File> localPaths = ImmutableList.of(tmpFile);
+
+    LOG.info("Synchronizing log " +
+        TextFormat.shortDebugString(segment) + " from " + url);
+    SecurityUtil.doAsLoginUser(
+        new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws IOException {
+            boolean success = false;
+            try {
+              TransferFsImage.doGetUrl(url, localPaths, storage, true);
+              assert tmpFile.exists();
+              success = true;
+            } finally {
+              if (!success) {
+                if (!tmpFile.delete()) {
+                  LOG.warn("Failed to delete temporary file " + tmpFile);
+                }
+              }
+            }
+            return null;
+          }
+        });
+    return tmpFile;
+  }
+  
+
+  /**
+   * In the case the node crashes in between downloading a log segment
+   * and persisting the associated paxos recovery data, the log segment
+   * will be left in its temporary location on disk. Given the paxos data,
+   * we can check if this was indeed the case, and &quot;roll forward&quot;
+   * the atomic operation.
+   * 
+   * See the inline comments in
+   * {@link #acceptRecovery(RequestInfo, SegmentStateProto, URL)} for more
+   * details.
+   *
+   * @throws IOException if the temporary file is unable to be renamed into
+   * place
+   */
+  private void completeHalfDoneAcceptRecovery(
+      PersistedRecoveryPaxosData paxosData) throws IOException {
+    if (paxosData == null) {
+      return;
+    }
+
+    long segmentId = paxosData.getSegmentState().getStartTxId();
+    long epoch = paxosData.getAcceptedInEpoch();
+    
+    File tmp = storage.getSyncLogTemporaryFile(segmentId, epoch);
+    
+    if (tmp.exists()) {
+      File dst = storage.getInProgressEditLog(segmentId);
+      LOG.info("Rolling forward previously half-completed synchronization: " +
+          tmp + " -> " + dst);
+      FileUtil.replaceFile(tmp, dst);
+    }
+  }
+
+  /**
+   * Retrieve the persisted data for recovering the given segment from disk.
+   */
+  private PersistedRecoveryPaxosData getPersistedPaxosData(long segmentTxId)
+      throws IOException {
+    File f = storage.getPaxosFile(segmentTxId);
+    if (!f.exists()) {
+      // Default instance has no fields filled in (they're optional)
+      return null;
+    }
+    
+    InputStream in = new FileInputStream(f);
+    try {
+      PersistedRecoveryPaxosData ret = PersistedRecoveryPaxosData.parseDelimitedFrom(in);
+      Preconditions.checkState(ret != null &&
+          ret.getSegmentState().getStartTxId() == segmentTxId,
+          "Bad persisted data for segment %s: %s",
+          segmentTxId, ret);
+      return ret;
+    } finally {
+      IOUtils.closeStream(in);
+    }
+  }
+
+  /**
+   * Persist data for recovering the given segment from disk.
+   */
+  private void persistPaxosData(long segmentTxId,
+      PersistedRecoveryPaxosData newData) throws IOException {
+    File f = storage.getPaxosFile(segmentTxId);
+    boolean success = false;
+    AtomicFileOutputStream fos = new AtomicFileOutputStream(f);
+    try {
+      newData.writeDelimitedTo(fos);
+      fos.write('\n');
+      // Write human-readable data after the protobuf. This is only
+      // to assist in debugging -- it's not parsed at all.
+      OutputStreamWriter writer = new OutputStreamWriter(fos);
+      
+      writer.write(String.valueOf(newData));
+      writer.write('\n');
+      writer.flush();
+      
+      fos.flush();
+      success = true;
+    } finally {
+      if (success) {
+        IOUtils.closeStream(fos);
+      } else {
+        fos.abort();
+      }
+    }
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalFaultInjector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalFaultInjector.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalFaultInjector.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalFaultInjector.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import java.io.IOException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Used for injecting faults in QuorumJournalManager tests.
+ * Calls into this are a no-op in production code. 
+ */
+@VisibleForTesting
+@InterfaceAudience.Private
+public class JournalFaultInjector {
+  public static JournalFaultInjector instance = new JournalFaultInjector();
+
+  public static JournalFaultInjector get() {
+    return instance;
+  }
+
+  public void beforePersistPaxosData() throws IOException {}
+  public void afterPersistPaxosData() throws IOException {}
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import java.io.IOException;
+
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+
+/**
+ * The server-side metrics for a journal from the JournalNode's
+ * perspective.
+ */
+@Metrics(about="Journal metrics", context="dfs")
+class JournalMetrics {
+  final MetricsRegistry registry = new MetricsRegistry("JournalNode");
+  
+  @Metric("Number of batches written since startup")
+  MutableCounterLong batchesWritten;
+  
+  @Metric("Number of txns written since startup")
+  MutableCounterLong txnsWritten;
+  
+  @Metric("Number of bytes written since startup")
+  MutableCounterLong bytesWritten;
+  
+  @Metric("Number of batches written where this node was lagging")
+  MutableCounterLong batchesWrittenWhileLagging;
+  
+  private final int[] QUANTILE_INTERVALS = new int[] {
+      1*60, // 1m
+      5*60, // 5m
+      60*60 // 1h
+  };
+  
+  MutableQuantiles[] syncsQuantiles;
+  
+  private final Journal journal;
+
+  JournalMetrics(Journal journal) {
+    this.journal = journal;
+    
+    syncsQuantiles = new MutableQuantiles[QUANTILE_INTERVALS.length];
+    for (int i = 0; i < syncsQuantiles.length; i++) {
+      int interval = QUANTILE_INTERVALS[i];
+      syncsQuantiles[i] = registry.newQuantiles(
+          "syncs" + interval + "s",
+          "Journal sync time", "ops", "latencyMicros", interval);
+    }
+  }
+  
+  public static JournalMetrics create(Journal j) {
+    JournalMetrics m = new JournalMetrics(j);
+    return DefaultMetricsSystem.instance().register(
+        m.getName(), null, m);
+  }
+
+  String getName() {
+    return "Journal-" + journal.getJournalId();
+  }
+
+  @Metric("Current writer's epoch")
+  public long getLastWriterEpoch() {
+    try {
+      return journal.getLastWriterEpoch();
+    } catch (IOException e) {
+      return -1L;
+    }
+  }
+  
+  @Metric("Last accepted epoch")
+  public long getLastPromisedEpoch() {
+    try {
+      return journal.getLastPromisedEpoch();
+    } catch (IOException e) {
+      return -1L;
+    }
+  }
+  
+  @Metric("The highest txid stored on this JN")
+  public long getLastWrittenTxId() {
+    return journal.getHighestWrittenTxId();
+  }
+  
+  @Metric("Number of transactions that this JN is lagging")
+  public long getCurrentLagTxns() {
+    try {
+      return journal.getCurrentLagTxns();
+    } catch (IOException e) {
+      return -1L;
+    }
+  }
+  
+  void addSync(long us) {
+    for (MutableQuantiles q : syncsQuantiles) {
+      q.add(us);
+    }
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager;
+import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.source.JvmMetrics;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+/**
+ * The JournalNode is a daemon which allows namenodes using
+ * the QuorumJournalManager to log and retrieve edits stored
+ * remotely. It is a thin wrapper around a local edit log
+ * directory with the addition of facilities to participate
+ * in the quorum protocol.
+ */
+@InterfaceAudience.Private
+public class JournalNode implements Tool, Configurable {
+  public static final Log LOG = LogFactory.getLog(JournalNode.class);
+  private Configuration conf;
+  private JournalNodeRpcServer rpcServer;
+  private JournalNodeHttpServer httpServer;
+  private Map<String, Journal> journalsById = Maps.newHashMap();
+
+  private File localDir;
+
+  static {
+    HdfsConfiguration.init();
+  }
+  
+  /**
+   * When stopped, the daemon will exit with this code. 
+   */
+  private int resultCode = 0;
+
+  synchronized Journal getOrCreateJournal(String jid) throws IOException {
+    QuorumJournalManager.checkJournalId(jid);
+    
+    Journal journal = journalsById.get(jid);
+    if (journal == null) {
+      File logDir = getLogDir(jid);
+      LOG.info("Initializing journal in directory " + logDir);      
+      journal = new Journal(logDir, jid, new ErrorReporter());
+      journalsById.put(jid, journal);
+    }
+    
+    return journal;
+  }
+
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    this.localDir = new File(
+        conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
+        DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT).trim());
+  }
+
+  private static void validateAndCreateJournalDir(File dir) throws IOException {
+    if (!dir.isAbsolute()) {
+      throw new IllegalArgumentException(
+          "Journal dir '" + dir + "' should be an absolute path");
+    }
+
+    if (!dir.exists() && !dir.mkdirs()) {
+      throw new IOException("Could not create journal dir '" +
+          dir + "'");
+    } else if (!dir.isDirectory()) {
+      throw new IOException("Journal directory '" + dir + "' is not " +
+          "a directory");
+    }
+    
+    if (!dir.canWrite()) {
+      throw new IOException("Unable to write to journal dir '" +
+          dir + "'");
+    }
+  }
+
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    start();
+    return join();
+  }
+
+  /**
+   * Start listening for edits via RPC.
+   */
+  public void start() throws IOException {
+    Preconditions.checkState(!isStarted(), "JN already running");
+    
+    validateAndCreateJournalDir(localDir);
+    
+    DefaultMetricsSystem.initialize("JournalNode");
+    JvmMetrics.create("JournalNode",
+        conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY),
+        DefaultMetricsSystem.instance());
+
+    InetSocketAddress socAddr = JournalNodeRpcServer.getAddress(conf);
+    SecurityUtil.login(conf, DFSConfigKeys.DFS_JOURNALNODE_KEYTAB_FILE_KEY,
+        DFSConfigKeys.DFS_JOURNALNODE_USER_NAME_KEY, socAddr.getHostName());
+    
+    httpServer = new JournalNodeHttpServer(conf, this);
+    httpServer.start();
+
+    rpcServer = new JournalNodeRpcServer(conf, this);
+    rpcServer.start();
+  }
+
+  public boolean isStarted() {
+    return rpcServer != null;
+  }
+
+  /**
+   * @return the address the IPC server is bound to
+   */
+  public InetSocketAddress getBoundIpcAddress() {
+    return rpcServer.getAddress();
+  }
+  
+
+  public InetSocketAddress getBoundHttpAddress() {
+    return httpServer.getAddress();
+  }
+
+
+  /**
+   * Stop the daemon with the given status code
+   * @param rc the status code with which to exit (non-zero
+   * should indicate an error)
+   */
+  public void stop(int rc) {
+    this.resultCode = rc;
+    
+    if (rpcServer != null) { 
+      rpcServer.stop();
+    }
+
+    if (httpServer != null) {
+      try {
+        httpServer.stop();
+      } catch (IOException ioe) {
+        LOG.warn("Unable to stop HTTP server for " + this, ioe);
+      }
+    }
+    
+    for (Journal j : journalsById.values()) {
+      IOUtils.cleanup(LOG, j);
+    }
+  }
+
+  /**
+   * Wait for the daemon to exit.
+   * @return the result code (non-zero if error)
+   */
+  int join() throws InterruptedException {
+    if (rpcServer != null) {
+      rpcServer.join();
+    }
+    return resultCode;
+  }
+  
+  public void stopAndJoin(int rc) throws InterruptedException {
+    stop(rc);
+    join();
+  }
+
+  /**
+   * Return the directory inside our configured storage
+   * dir which corresponds to a given journal. 
+   * @param jid the journal identifier
+   * @return the file, which may or may not exist yet
+   */
+  private File getLogDir(String jid) {
+    String dir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
+        DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT);
+    Preconditions.checkArgument(jid != null &&
+        !jid.isEmpty(),
+        "bad journal identifier: %s", jid);
+    return new File(new File(dir), jid);
+  }
+
+  
+  private class ErrorReporter implements StorageErrorReporter {
+    @Override
+    public void reportErrorOnFile(File f) {
+      LOG.fatal("Error reported on file " + f + "... exiting",
+          new Exception());
+      stop(1);
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    System.exit(ToolRunner.run(new JournalNode(), args));
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_INTERNAL_SPNEGO_USER_NAME_KEY;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import javax.servlet.ServletContext;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.SecurityUtil;
+
+/**
+ * Encapsulates the HTTP server started by the Journal Service.
+ */
+@InterfaceAudience.Private
+public class JournalNodeHttpServer {
+  public static final Log LOG = LogFactory.getLog(
+      JournalNodeHttpServer.class);
+
+  public static final String JN_ATTRIBUTE_KEY = "localjournal";
+
+  private HttpServer httpServer;
+  private int infoPort;
+  private JournalNode localJournalNode;
+
+  private final Configuration conf;
+
+  JournalNodeHttpServer(Configuration conf, JournalNode jn) {
+    this.conf = conf;
+    this.localJournalNode = jn;
+  }
+
+  void start() throws IOException {
+    final InetSocketAddress bindAddr = getAddress(conf);
+
+    // initialize the webserver for uploading/downloading files.
+    LOG.info("Starting web server as: "+ SecurityUtil.getServerPrincipal(conf
+        .get(DFS_JOURNALNODE_INTERNAL_SPNEGO_USER_NAME_KEY),
+        bindAddr.getHostName()));
+
+    int tmpInfoPort = bindAddr.getPort();
+    httpServer = new HttpServer("journal", bindAddr.getHostName(),
+        tmpInfoPort, tmpInfoPort == 0, conf, new AccessControlList(conf
+            .get(DFS_ADMIN, " "))) {
+      {
+        if (UserGroupInformation.isSecurityEnabled()) {
+          initSpnego(conf, DFS_JOURNALNODE_INTERNAL_SPNEGO_USER_NAME_KEY,
+              DFS_JOURNALNODE_KEYTAB_FILE_KEY);
+        }
+      }
+    };
+    httpServer.setAttribute(JN_ATTRIBUTE_KEY, localJournalNode);
+    httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
+    httpServer.addInternalServlet("getJournal", "/getJournal",
+        GetJournalEditServlet.class, true);
+    httpServer.start();
+
+    // The web-server port can be ephemeral... ensure we have the correct info
+    infoPort = httpServer.getPort();
+
+    LOG.info("Journal Web-server up at: " + bindAddr + ":" + infoPort);
+  }
+
+  void stop() throws IOException {
+    if (httpServer != null) {
+      try {
+        httpServer.stop();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+  }
+  
+  /**
+   * Return the actual address bound to by the running server.
+   */
+  public InetSocketAddress getAddress() {
+    InetSocketAddress addr = httpServer.getListenerAddress();
+    assert addr.getPort() != 0;
+    return addr;
+  }
+
+  private static InetSocketAddress getAddress(Configuration conf) {
+    String addr = conf.get(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY,
+        DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_DEFAULT);
+    return NetUtils.createSocketAddr(addr,
+        DFSConfigKeys.DFS_JOURNALNODE_HTTP_PORT_DEFAULT,
+        DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY);
+  }
+
+  public static Journal getJournalFromContext(ServletContext context, String jid)
+      throws IOException {
+    JournalNode jn = (JournalNode)context.getAttribute(JN_ATTRIBUTE_KEY);
+    return jn.getOrCreateJournal(jid);
+  }
+
+  public static Configuration getConfFromContext(ServletContext context) {
+    return (Configuration) context.getAttribute(JspHelper.CURRENT_CONF);
+  }
+}

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java?rev=1417596&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java Wed Dec  5 19:22:17 2012
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.qjournal.server;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URL;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HDFSPolicyProvider;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.QJournalProtocolService;
+import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
+import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
+import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB;
+import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.net.NetUtils;
+
+import com.google.protobuf.BlockingService;
+
+class JournalNodeRpcServer implements QJournalProtocol {
+
+  private static final int HANDLER_COUNT = 5;
+  private JournalNode jn;
+  private Server server;
+
+  JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException {
+    this.jn = jn;
+    
+    Configuration confCopy = new Configuration(conf);
+    
+    // Ensure that nagling doesn't kick in, which could cause latency issues.
+    confCopy.setBoolean(
+        CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
+        true);
+    
+    InetSocketAddress addr = getAddress(confCopy);
+    RPC.setProtocolEngine(confCopy, QJournalProtocolPB.class,
+        ProtobufRpcEngine.class);
+    QJournalProtocolServerSideTranslatorPB translator =
+        new QJournalProtocolServerSideTranslatorPB(this);
+    BlockingService service = QJournalProtocolService
+        .newReflectiveBlockingService(translator);
+    
+    this.server = new RPC.Builder(confCopy)
+      .setProtocol(QJournalProtocolPB.class)
+      .setInstance(service)
+      .setBindAddress(addr.getHostName())
+      .setPort(addr.getPort())
+      .setNumHandlers(HANDLER_COUNT)
+      .setVerbose(false)
+      .build();
+
+    // set service-level authorization security policy
+    if (confCopy.getBoolean(
+      CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
+          server.refreshServiceAcl(confCopy, new HDFSPolicyProvider());
+    }
+  }
+
+  void start() {
+    this.server.start();
+  }
+
+  public InetSocketAddress getAddress() {
+    return server.getListenerAddress();
+  }
+  
+  void join() throws InterruptedException {
+    this.server.join();
+  }
+  
+  void stop() {
+    this.server.stop();
+  }
+  
+  static InetSocketAddress getAddress(Configuration conf) {
+    String addr = conf.get(
+        DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
+        DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_DEFAULT);
+    return NetUtils.createSocketAddr(addr, 0,
+        DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY);
+  }
+
+  @Override
+  public boolean isFormatted(String journalId) throws IOException {
+    return jn.getOrCreateJournal(journalId).isFormatted();
+  }
+
+  @Override
+  public GetJournalStateResponseProto getJournalState(String journalId)
+        throws IOException {
+    long epoch = jn.getOrCreateJournal(journalId).getLastPromisedEpoch(); 
+    return GetJournalStateResponseProto.newBuilder()
+        .setLastPromisedEpoch(epoch)
+        .setHttpPort(jn.getBoundHttpAddress().getPort())
+        .build();
+  }
+
+  @Override
+  public NewEpochResponseProto newEpoch(String journalId,
+      NamespaceInfo nsInfo,
+      long epoch) throws IOException {
+    return jn.getOrCreateJournal(journalId).newEpoch(nsInfo, epoch);
+  }
+
+  @Override
+  public void format(String journalId, NamespaceInfo nsInfo)
+      throws IOException {
+    jn.getOrCreateJournal(journalId).format(nsInfo);
+  }
+
+  @Override
+  public void journal(RequestInfo reqInfo,
+      long segmentTxId, long firstTxnId,
+      int numTxns, byte[] records) throws IOException {
+    jn.getOrCreateJournal(reqInfo.getJournalId())
+       .journal(reqInfo, segmentTxId, firstTxnId, numTxns, records);
+  }
+  
+  @Override
+  public void heartbeat(RequestInfo reqInfo) throws IOException {
+    jn.getOrCreateJournal(reqInfo.getJournalId())
+      .heartbeat(reqInfo);
+  }
+
+  @Override
+  public void startLogSegment(RequestInfo reqInfo, long txid)
+      throws IOException {
+    jn.getOrCreateJournal(reqInfo.getJournalId())
+      .startLogSegment(reqInfo, txid);
+  }
+
+  @Override
+  public void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
+      long endTxId) throws IOException {
+    jn.getOrCreateJournal(reqInfo.getJournalId())
+      .finalizeLogSegment(reqInfo, startTxId, endTxId);
+  }
+
+  @Override
+  public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep)
+      throws IOException {
+    jn.getOrCreateJournal(reqInfo.getJournalId())
+      .purgeLogsOlderThan(reqInfo, minTxIdToKeep);
+  }
+
+  @Override
+  public GetEditLogManifestResponseProto getEditLogManifest(String jid,
+      long sinceTxId) throws IOException {
+    
+    RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid)
+        .getEditLogManifest(sinceTxId);
+    
+    return GetEditLogManifestResponseProto.newBuilder()
+        .setManifest(PBHelper.convert(manifest))
+        .setHttpPort(jn.getBoundHttpAddress().getPort())
+        .build();
+  }
+
+  @Override
+  public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
+      long segmentTxId) throws IOException {
+    return jn.getOrCreateJournal(reqInfo.getJournalId())
+        .prepareRecovery(reqInfo, segmentTxId);
+  }
+
+  @Override
+  public void acceptRecovery(RequestInfo reqInfo, SegmentStateProto log,
+      URL fromUrl) throws IOException {
+    jn.getOrCreateJournal(reqInfo.getJournalId())
+        .acceptRecovery(reqInfo, log, fromUrl);
+  }
+
+}

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java Wed Dec  5 19:22:17 2012
@@ -39,7 +39,8 @@ public final class HdfsServerConstants {
    */
   static public enum NodeType {
     NAME_NODE,
-    DATA_NODE;
+    DATA_NODE,
+    JOURNAL_NODE;
   }
 
   /** Startup options */

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java Wed Dec  5 19:22:17 2012
@@ -42,6 +42,8 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.util.VersionInfo;
 
+import com.google.common.base.Preconditions;
+
 
 
 /**
@@ -76,7 +78,7 @@ public abstract class Storage extends St
   /** Layout versions of 0.20.203 release */
   public static final int[] LAYOUT_VERSIONS_203 = {-19, -31};
 
-  private   static final String STORAGE_FILE_LOCK     = "in_use.lock";
+  public    static final String STORAGE_FILE_LOCK     = "in_use.lock";
   protected static final String STORAGE_FILE_VERSION  = "VERSION";
   public    static final String STORAGE_DIR_CURRENT   = "current";
   public    static final String STORAGE_DIR_PREVIOUS  = "previous";
@@ -752,6 +754,15 @@ public abstract class Storage extends St
     return storageDirs.get(idx);
   }
   
+  /**
+   * @return the storage directory, with the precondition that this storage
+   * has exactly one storage directory
+   */
+  public StorageDirectory getSingularStorageDir() {
+    Preconditions.checkState(storageDirs.size() == 1);
+    return storageDirs.get(0);
+  }
+  
   protected void addStorageDir(StorageDirectory sd) {
     storageDirs.add(sd);
   }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1417596&r1=1417595&r2=1417596&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Wed Dec  5 19:22:17 2012
@@ -114,7 +114,7 @@ class EditLogBackupOutputStream extends 
   }
 
   @Override // EditLogOutputStream
-  protected void flushAndSync() throws IOException {
+  protected void flushAndSync(boolean durable) throws IOException {
     assert out.getLength() == 0 : "Output buffer is not empty";
     
     int numReadyTxns = doubleBuf.countReadyTxns();



Mime
View raw message