Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 63F53D85E for ; Wed, 5 Dec 2012 19:23:08 +0000 (UTC) Received: (qmail 96288 invoked by uid 500); 5 Dec 2012 19:23:08 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 96248 invoked by uid 500); 5 Dec 2012 19:23:08 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 96237 invoked by uid 99); 5 Dec 2012 19:23:08 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Dec 2012 19:23:08 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Dec 2012 19:22:57 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id BE2EF2388B56; Wed, 5 Dec 2012 19:22:34 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1417596 [3/6] - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ src/main/bin/ src/main/java/org/apache/hadoop/hdfs/ src/main/java... Date: Wed, 05 Dec 2012 19:22:25 -0000 To: hdfs-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121205192234.BE2EF2388B56@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java?rev=1417596&view=auto ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java (added) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/GetJournalEditServlet.java Wed Dec 5 19:22:17 2012 @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.server; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.HashSet; +import java.util.Set; + +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager; +import org.apache.hadoop.hdfs.server.common.JspHelper; +import org.apache.hadoop.hdfs.server.namenode.FileJournalManager; +import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; +import org.apache.hadoop.hdfs.server.namenode.GetImageServlet; +import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode; +import org.apache.hadoop.hdfs.server.namenode.TransferFsImage; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.util.DataTransferThrottler; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ServletUtil; +import org.apache.hadoop.util.StringUtils; + +/** + * This servlet is used in two cases: + *
    + *
  • The QuorumJournalManager, when reading edits, fetches the edit streams + * from the journal nodes.
  • + *
  • During edits synchronization, one journal node will fetch edits from + * another journal node.
  • + *
+ */ +@InterfaceAudience.Private +public class GetJournalEditServlet extends HttpServlet { + + private static final long serialVersionUID = -4635891628211723009L; + private static final Log LOG = LogFactory.getLog(GetJournalEditServlet.class); + + static final String STORAGEINFO_PARAM = "storageInfo"; + static final String JOURNAL_ID_PARAM = "jid"; + static final String SEGMENT_TXID_PARAM = "segmentTxId"; + + protected boolean isValidRequestor(HttpServletRequest request, Configuration conf) + throws IOException { + String remotePrincipal = request.getUserPrincipal().getName(); + String remoteShortName = request.getRemoteUser(); + if (remotePrincipal == null) { // This really shouldn't happen... + LOG.warn("Received null remoteUser while authorizing access to " + + "GetJournalEditServlet"); + return false; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Validating request made by " + remotePrincipal + + " / " + remoteShortName + ". This user is: " + + UserGroupInformation.getLoginUser()); + } + + Set validRequestors = new HashSet(); + validRequestors.addAll(DFSUtil.getAllNnPrincipals(conf)); + validRequestors.add( + SecurityUtil.getServerPrincipal(conf + .get(DFSConfigKeys.DFS_SECONDARY_NAMENODE_USER_NAME_KEY), + SecondaryNameNode.getHttpAddress(conf).getHostName())); + + // Check the full principal name of all the configured valid requestors. + for (String v : validRequestors) { + if (LOG.isDebugEnabled()) + LOG.debug("isValidRequestor is comparing to valid requestor: " + v); + if (v != null && v.equals(remotePrincipal)) { + if (LOG.isDebugEnabled()) + LOG.debug("isValidRequestor is allowing: " + remotePrincipal); + return true; + } + } + + // Additionally, we compare the short name of the requestor to this JN's + // username, because we want to allow requests from other JNs during + // recovery, but we can't enumerate the full list of JNs. + if (remoteShortName.equals( + UserGroupInformation.getLoginUser().getShortUserName())) { + if (LOG.isDebugEnabled()) + LOG.debug("isValidRequestor is allowing other JN principal: " + + remotePrincipal); + return true; + } + + if (LOG.isDebugEnabled()) + LOG.debug("isValidRequestor is rejecting: " + remotePrincipal); + return false; + } + + private boolean checkRequestorOrSendError(Configuration conf, + HttpServletRequest request, HttpServletResponse response) + throws IOException { + if (UserGroupInformation.isSecurityEnabled() + && !isValidRequestor(request, conf)) { + response.sendError(HttpServletResponse.SC_FORBIDDEN, + "Only Namenode and another JournalNode may access this servlet"); + LOG.warn("Received non-NN/JN request for edits from " + + request.getRemoteHost()); + return false; + } + return true; + } + + private boolean checkStorageInfoOrSendError(JNStorage storage, + HttpServletRequest request, HttpServletResponse response) + throws IOException { + String myStorageInfoString = storage.toColonSeparatedString(); + String theirStorageInfoString = request.getParameter(STORAGEINFO_PARAM); + + if (theirStorageInfoString != null + && !myStorageInfoString.equals(theirStorageInfoString)) { + String msg = "This node has storage info '" + myStorageInfoString + + "' but the requesting node expected '" + + theirStorageInfoString + "'"; + + response.sendError(HttpServletResponse.SC_FORBIDDEN, msg); + LOG.warn("Received an invalid request file transfer request from " + + request.getRemoteAddr() + ": " + msg); + return false; + } + return true; + } + + @Override + public void doGet(final HttpServletRequest request, + final HttpServletResponse response) throws ServletException, IOException { + FileInputStream editFileIn = null; + try { + final ServletContext context = getServletContext(); + final Configuration conf = (Configuration) getServletContext() + .getAttribute(JspHelper.CURRENT_CONF); + final String journalId = request.getParameter(JOURNAL_ID_PARAM); + QuorumJournalManager.checkJournalId(journalId); + final JNStorage storage = JournalNodeHttpServer + .getJournalFromContext(context, journalId).getStorage(); + + // Check security + if (!checkRequestorOrSendError(conf, request, response)) { + return; + } + + // Check that the namespace info is correct + if (!checkStorageInfoOrSendError(storage, request, response)) { + return; + } + + long segmentTxId = ServletUtil.parseLongParam(request, + SEGMENT_TXID_PARAM); + + FileJournalManager fjm = storage.getJournalManager(); + File editFile; + + synchronized (fjm) { + // Synchronize on the FJM so that the file doesn't get finalized + // out from underneath us while we're in the process of opening + // it up. + EditLogFile elf = fjm.getLogFile( + segmentTxId); + if (elf == null) { + response.sendError(HttpServletResponse.SC_NOT_FOUND, + "No edit log found starting at txid " + segmentTxId); + return; + } + editFile = elf.getFile(); + GetImageServlet.setVerificationHeaders(response, editFile); + GetImageServlet.setFileNameHeaders(response, editFile); + editFileIn = new FileInputStream(editFile); + } + + DataTransferThrottler throttler = GetImageServlet.getThrottler(conf); + + // send edits + TransferFsImage.getFileServer(response, editFile, editFileIn, throttler); + + } catch (Throwable t) { + String errMsg = "getedit failed. " + StringUtils.stringifyException(t); + response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, errMsg); + throw new IOException(errMsg); + } finally { + IOUtils.closeStream(editFileIn); + } + } + + public static String buildPath(String journalId, long segmentTxId, + NamespaceInfo nsInfo) { + StringBuilder path = new StringBuilder("/getJournal?"); + try { + path.append(JOURNAL_ID_PARAM).append("=") + .append(URLEncoder.encode(journalId, "UTF-8")); + path.append("&" + SEGMENT_TXID_PARAM).append("=") + .append(segmentTxId); + path.append("&" + STORAGEINFO_PARAM).append("=") + .append(URLEncoder.encode(nsInfo.toColonSeparatedString(), "UTF-8")); + } catch (UnsupportedEncodingException e) { + // Never get here -- everyone supports UTF-8 + throw new RuntimeException(e); + } + return path.toString(); + } +} Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java?rev=1417596&view=auto ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java (added) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java Wed Dec 5 19:22:17 2012 @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.server; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; +import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.common.StorageErrorReporter; +import org.apache.hadoop.hdfs.server.namenode.FileJournalManager; +import org.apache.hadoop.hdfs.server.namenode.NNStorage; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; + +import com.google.common.collect.ImmutableList; + +/** + * A {@link Storage} implementation for the {@link JournalNode}. + * + * The JN has a storage directory for each namespace for which it stores + * metadata. There is only a single directory per JN in the current design. + */ +class JNStorage extends Storage { + + private final FileJournalManager fjm; + private final StorageDirectory sd; + private StorageState state; + + + private static final List CURRENT_DIR_PURGE_REGEXES = + ImmutableList.of( + Pattern.compile("edits_\\d+-(\\d+)"), + Pattern.compile("edits_inprogress_(\\d+)(?:\\..*)?")); + + private static final List PAXOS_DIR_PURGE_REGEXES = + ImmutableList.of(Pattern.compile("(\\d+)")); + + /** + * @param logDir the path to the directory in which data will be stored + * @param errorReporter a callback to report errors + * @throws IOException + */ + protected JNStorage(File logDir, StorageErrorReporter errorReporter) throws IOException { + super(NodeType.JOURNAL_NODE); + + sd = new StorageDirectory(logDir); + this.addStorageDir(sd); + this.fjm = new FileJournalManager(sd, errorReporter); + + analyzeStorage(); + } + + FileJournalManager getJournalManager() { + return fjm; + } + + @Override + public boolean isPreUpgradableLayout(StorageDirectory sd) throws IOException { + return false; + } + + /** + * Find an edits file spanning the given transaction ID range. + * If no such file exists, an exception is thrown. + */ + File findFinalizedEditsFile(long startTxId, long endTxId) throws IOException { + File ret = new File(sd.getCurrentDir(), + NNStorage.getFinalizedEditsFileName(startTxId, endTxId)); + if (!ret.exists()) { + throw new IOException( + "No edits file for range " + startTxId + "-" + endTxId); + } + return ret; + } + + /** + * @return the path for an in-progress edits file starting at the given + * transaction ID. This does not verify existence of the file. + */ + File getInProgressEditLog(long startTxId) { + return new File(sd.getCurrentDir(), + NNStorage.getInProgressEditsFileName(startTxId)); + } + + /** + * @param segmentTxId the first txid of the segment + * @param epoch the epoch number of the writer which is coordinating + * recovery + * @return the temporary path in which an edits log should be stored + * while it is being downloaded from a remote JournalNode + */ + File getSyncLogTemporaryFile(long segmentTxId, long epoch) { + String name = NNStorage.getInProgressEditsFileName(segmentTxId) + + ".epoch=" + epoch; + return new File(sd.getCurrentDir(), name); + } + + /** + * @return the path for the file which contains persisted data for the + * paxos-like recovery process for the given log segment. + */ + File getPaxosFile(long segmentTxId) { + return new File(getPaxosDir(), String.valueOf(segmentTxId)); + } + + File getPaxosDir() { + return new File(sd.getCurrentDir(), "paxos"); + } + + /** + * Remove any log files and associated paxos files which are older than + * the given txid. + */ + void purgeDataOlderThan(long minTxIdToKeep) throws IOException { + purgeMatching(sd.getCurrentDir(), + CURRENT_DIR_PURGE_REGEXES, minTxIdToKeep); + purgeMatching(getPaxosDir(), PAXOS_DIR_PURGE_REGEXES, minTxIdToKeep); + } + + /** + * Purge files in the given directory which match any of the set of patterns. + * The patterns must have a single numeric capture group which determines + * the associated transaction ID of the file. Only those files for which + * the transaction ID is less than the minTxIdToKeep parameter + * are removed. + */ + private static void purgeMatching(File dir, List patterns, + long minTxIdToKeep) throws IOException { + + for (File f : FileUtil.listFiles(dir)) { + if (!f.isFile()) continue; + + for (Pattern p : patterns) { + Matcher matcher = p.matcher(f.getName()); + if (matcher.matches()) { + // This parsing will always succeed since the group(1) is + // /\d+/ in the regex itself. + long txid = Long.valueOf(matcher.group(1)); + if (txid < minTxIdToKeep) { + LOG.info("Purging no-longer needed file " + txid); + if (!f.delete()) { + LOG.warn("Unable to delete no-longer-needed data " + + f); + } + break; + } + } + } + } + } + + void format(NamespaceInfo nsInfo) throws IOException { + setStorageInfo(nsInfo); + LOG.info("Formatting journal storage directory " + + sd + " with nsid: " + getNamespaceID()); + // Unlock the directory before formatting, because we will + // re-analyze it after format(). The analyzeStorage() call + // below is reponsible for re-locking it. This is a no-op + // if the storage is not currently locked. + unlockAll(); + sd.clearDirectory(); + writeProperties(sd); + if (!getPaxosDir().mkdirs()) { + throw new IOException("Could not create paxos dir: " + getPaxosDir()); + } + analyzeStorage(); + } + + + void analyzeStorage() throws IOException { + this.state = sd.analyzeStorage(StartupOption.REGULAR, this); + if (state == StorageState.NORMAL) { + readProperties(sd); + } + } + + void checkConsistentNamespace(NamespaceInfo nsInfo) + throws IOException { + if (nsInfo.getNamespaceID() != getNamespaceID()) { + throw new IOException("Incompatible namespaceID for journal " + + this.sd + ": NameNode has nsId " + nsInfo.getNamespaceID() + + " but storage has nsId " + getNamespaceID()); + } + + if (!nsInfo.getClusterID().equals(getClusterID())) { + throw new IOException("Incompatible clusterID for journal " + + this.sd + ": NameNode has clusterId '" + nsInfo.getClusterID() + + "' but storage has clusterId '" + getClusterID() + "'"); + + } + } + + public void close() throws IOException { + LOG.info("Closing journal storage for " + sd); + unlockAll(); + } + + public boolean isFormatted() { + return state == StorageState.NORMAL; + } +} Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java?rev=1417596&view=auto ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java (added) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java Wed Dec 5 19:22:17 2012 @@ -0,0 +1,953 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.server; + +import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStreamWriter; +import java.net.URL; +import java.security.PrivilegedExceptionAction; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; +import org.apache.hadoop.hdfs.qjournal.protocol.JournalOutOfSyncException; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; +import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo; +import org.apache.hadoop.hdfs.server.common.StorageErrorReporter; +import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; +import org.apache.hadoop.hdfs.server.namenode.FileJournalManager; +import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; +import org.apache.hadoop.hdfs.server.namenode.JournalManager; +import org.apache.hadoop.hdfs.server.namenode.TransferFsImage; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; +import org.apache.hadoop.hdfs.util.AtomicFileOutputStream; +import org.apache.hadoop.hdfs.util.BestEffortLongFile; +import org.apache.hadoop.hdfs.util.PersistentLongFile; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.security.SecurityUtil; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Range; +import com.google.common.collect.Ranges; +import com.google.protobuf.TextFormat; + +/** + * A JournalNode can manage journals for several clusters at once. + * Each such journal is entirely independent despite being hosted by + * the same JVM. + */ +class Journal implements Closeable { + static final Log LOG = LogFactory.getLog(Journal.class); + + + // Current writing state + private EditLogOutputStream curSegment; + private long curSegmentTxId = HdfsConstants.INVALID_TXID; + private long nextTxId = HdfsConstants.INVALID_TXID; + private long highestWrittenTxId = 0; + + private final String journalId; + + private final JNStorage storage; + + /** + * When a new writer comes along, it asks each node to promise + * to ignore requests from any previous writer, as identified + * by epoch number. In order to make such a promise, the epoch + * number of that writer is stored persistently on disk. + */ + private PersistentLongFile lastPromisedEpoch; + + /** + * Each IPC that comes from a given client contains a serial number + * which only increases from the client's perspective. Whenever + * we switch epochs, we reset this back to -1. Whenever an IPC + * comes from a client, we ensure that it is strictly higher + * than any previous IPC. This guards against any bugs in the IPC + * layer that would re-order IPCs or cause a stale retry from an old + * request to resurface and confuse things. + */ + private long currentEpochIpcSerial = -1; + + /** + * The epoch number of the last writer to actually write a transaction. + * This is used to differentiate log segments after a crash at the very + * beginning of a segment. See the the 'testNewerVersionOfSegmentWins' + * test case. + */ + private PersistentLongFile lastWriterEpoch; + + /** + * Lower-bound on the last committed transaction ID. This is not + * depended upon for correctness, but acts as a sanity check + * during the recovery procedures, and as a visibility mark + * for clients reading in-progress logs. + */ + private BestEffortLongFile committedTxnId; + + private static final String LAST_PROMISED_FILENAME = "last-promised-epoch"; + private static final String LAST_WRITER_EPOCH = "last-writer-epoch"; + private static final String COMMITTED_TXID_FILENAME = "committed-txid"; + + private final FileJournalManager fjm; + + private final JournalMetrics metrics; + + + Journal(File logDir, String journalId, + StorageErrorReporter errorReporter) throws IOException { + storage = new JNStorage(logDir, errorReporter); + this.journalId = journalId; + + refreshCachedData(); + + this.fjm = storage.getJournalManager(); + + this.metrics = JournalMetrics.create(this); + + EditLogFile latest = scanStorageForLatestEdits(); + if (latest != null) { + highestWrittenTxId = latest.getLastTxId(); + } + } + + /** + * Reload any data that may have been cached. This is necessary + * when we first load the Journal, but also after any formatting + * operation, since the cached data is no longer relevant. + */ + private synchronized void refreshCachedData() { + IOUtils.closeStream(committedTxnId); + + File currentDir = storage.getSingularStorageDir().getCurrentDir(); + this.lastPromisedEpoch = new PersistentLongFile( + new File(currentDir, LAST_PROMISED_FILENAME), 0); + this.lastWriterEpoch = new PersistentLongFile( + new File(currentDir, LAST_WRITER_EPOCH), 0); + this.committedTxnId = new BestEffortLongFile( + new File(currentDir, COMMITTED_TXID_FILENAME), + HdfsConstants.INVALID_TXID); + } + + /** + * Scan the local storage directory, and return the segment containing + * the highest transaction. + * @return the EditLogFile with the highest transactions, or null + * if no files exist. + */ + private synchronized EditLogFile scanStorageForLatestEdits() throws IOException { + if (!fjm.getStorageDirectory().getCurrentDir().exists()) { + return null; + } + + LOG.info("Scanning storage " + fjm); + List files = fjm.getLogFiles(0); + + while (!files.isEmpty()) { + EditLogFile latestLog = files.remove(files.size() - 1); + latestLog.validateLog(); + LOG.info("Latest log is " + latestLog); + if (latestLog.getLastTxId() == HdfsConstants.INVALID_TXID) { + // the log contains no transactions + LOG.warn("Latest log " + latestLog + " has no transactions. " + + "moving it aside and looking for previous log"); + latestLog.moveAsideEmptyFile(); + } else { + return latestLog; + } + } + + LOG.info("No files in " + fjm); + return null; + } + + /** + * Format the local storage with the given namespace. + */ + void format(NamespaceInfo nsInfo) throws IOException { + Preconditions.checkState(nsInfo.getNamespaceID() != 0, + "can't format with uninitialized namespace info: %s", + nsInfo); + LOG.info("Formatting " + this + " with namespace info: " + + nsInfo); + storage.format(nsInfo); + refreshCachedData(); + } + + /** + * Unlock and release resources. + */ + @Override // Closeable + public void close() throws IOException { + storage.close(); + + IOUtils.closeStream(committedTxnId); + } + + JNStorage getStorage() { + return storage; + } + + String getJournalId() { + return journalId; + } + + /** + * @return the last epoch which this node has promised not to accept + * any lower epoch, or 0 if no promises have been made. + */ + synchronized long getLastPromisedEpoch() throws IOException { + checkFormatted(); + return lastPromisedEpoch.get(); + } + + synchronized public long getLastWriterEpoch() throws IOException { + checkFormatted(); + return lastWriterEpoch.get(); + } + + synchronized long getCommittedTxnIdForTests() throws IOException { + return committedTxnId.get(); + } + + synchronized long getCurrentLagTxns() throws IOException { + long committed = committedTxnId.get(); + if (committed == 0) { + return 0; + } + + return Math.max(committed - highestWrittenTxId, 0L); + } + + synchronized long getHighestWrittenTxId() { + return highestWrittenTxId; + } + + @VisibleForTesting + JournalMetrics getMetricsForTests() { + return metrics; + } + + /** + * Try to create a new epoch for this journal. + * @param nsInfo the namespace, which is verified for consistency or used to + * format, if the Journal has not yet been written to. + * @param epoch the epoch to start + * @return the status information necessary to begin recovery + * @throws IOException if the node has already made a promise to another + * writer with a higher epoch number, if the namespace is inconsistent, + * or if a disk error occurs. + */ + synchronized NewEpochResponseProto newEpoch( + NamespaceInfo nsInfo, long epoch) throws IOException { + + checkFormatted(); + storage.checkConsistentNamespace(nsInfo); + + // Check that the new epoch being proposed is in fact newer than + // any other that we've promised. + if (epoch <= getLastPromisedEpoch()) { + throw new IOException("Proposed epoch " + epoch + " <= last promise " + + getLastPromisedEpoch()); + } + + updateLastPromisedEpoch(epoch); + abortCurSegment(); + + NewEpochResponseProto.Builder builder = + NewEpochResponseProto.newBuilder(); + + EditLogFile latestFile = scanStorageForLatestEdits(); + + if (latestFile != null) { + builder.setLastSegmentTxId(latestFile.getFirstTxId()); + } + + return builder.build(); + } + + private void updateLastPromisedEpoch(long newEpoch) throws IOException { + LOG.info("Updating lastPromisedEpoch from " + lastPromisedEpoch.get() + + " to " + newEpoch + " for client " + Server.getRemoteIp()); + lastPromisedEpoch.set(newEpoch); + + // Since we have a new writer, reset the IPC serial - it will start + // counting again from 0 for this writer. + currentEpochIpcSerial = -1; + } + + private void abortCurSegment() throws IOException { + if (curSegment == null) { + return; + } + + curSegment.abort(); + curSegment = null; + curSegmentTxId = HdfsConstants.INVALID_TXID; + } + + /** + * Write a batch of edits to the journal. + * {@see QJournalProtocol#journal(RequestInfo, long, long, int, byte[])} + */ + synchronized void journal(RequestInfo reqInfo, + long segmentTxId, long firstTxnId, + int numTxns, byte[] records) throws IOException { + checkFormatted(); + checkWriteRequest(reqInfo); + + checkSync(curSegment != null, + "Can't write, no segment open"); + + if (curSegmentTxId != segmentTxId) { + // Sanity check: it is possible that the writer will fail IPCs + // on both the finalize() and then the start() of the next segment. + // This could cause us to continue writing to an old segment + // instead of rolling to a new one, which breaks one of the + // invariants in the design. If it happens, abort the segment + // and throw an exception. + JournalOutOfSyncException e = new JournalOutOfSyncException( + "Writer out of sync: it thinks it is writing segment " + segmentTxId + + " but current segment is " + curSegmentTxId); + abortCurSegment(); + throw e; + } + + checkSync(nextTxId == firstTxnId, + "Can't write txid " + firstTxnId + " expecting nextTxId=" + nextTxId); + + long lastTxnId = firstTxnId + numTxns - 1; + if (LOG.isTraceEnabled()) { + LOG.trace("Writing txid " + firstTxnId + "-" + lastTxnId); + } + + // If the edit has already been marked as committed, we know + // it has been fsynced on a quorum of other nodes, and we are + // "catching up" with the rest. Hence we do not need to fsync. + boolean isLagging = lastTxnId <= committedTxnId.get(); + boolean shouldFsync = !isLagging; + + curSegment.writeRaw(records, 0, records.length); + curSegment.setReadyToFlush(); + Stopwatch sw = new Stopwatch(); + sw.start(); + curSegment.flush(shouldFsync); + sw.stop(); + + metrics.addSync(sw.elapsedTime(TimeUnit.MICROSECONDS)); + + if (isLagging) { + // This batch of edits has already been committed on a quorum of other + // nodes. So, we are in "catch up" mode. This gets its own metric. + metrics.batchesWrittenWhileLagging.incr(1); + } + + metrics.batchesWritten.incr(1); + metrics.bytesWritten.incr(records.length); + metrics.txnsWritten.incr(numTxns); + + highestWrittenTxId = lastTxnId; + nextTxId = lastTxnId + 1; + } + + public void heartbeat(RequestInfo reqInfo) throws IOException { + checkRequest(reqInfo); + } + + /** + * Ensure that the given request is coming from the correct writer and in-order. + * @param reqInfo the request info + * @throws IOException if the request is invalid. + */ + private synchronized void checkRequest(RequestInfo reqInfo) throws IOException { + // Invariant 25 from ZAB paper + if (reqInfo.getEpoch() < lastPromisedEpoch.get()) { + throw new IOException("IPC's epoch " + reqInfo.getEpoch() + + " is less than the last promised epoch " + + lastPromisedEpoch.get()); + } else if (reqInfo.getEpoch() > lastPromisedEpoch.get()) { + // A newer client has arrived. Fence any previous writers by updating + // the promise. + updateLastPromisedEpoch(reqInfo.getEpoch()); + } + + // Ensure that the IPCs are arriving in-order as expected. + checkSync(reqInfo.getIpcSerialNumber() > currentEpochIpcSerial, + "IPC serial %s from client %s was not higher than prior highest " + + "IPC serial %s", reqInfo.getIpcSerialNumber(), + Server.getRemoteIp(), + currentEpochIpcSerial); + currentEpochIpcSerial = reqInfo.getIpcSerialNumber(); + + if (reqInfo.hasCommittedTxId()) { + Preconditions.checkArgument( + reqInfo.getCommittedTxId() >= committedTxnId.get(), + "Client trying to move committed txid backward from " + + committedTxnId.get() + " to " + reqInfo.getCommittedTxId()); + + committedTxnId.set(reqInfo.getCommittedTxId()); + } + } + + private synchronized void checkWriteRequest(RequestInfo reqInfo) throws IOException { + checkRequest(reqInfo); + + if (reqInfo.getEpoch() != lastWriterEpoch.get()) { + throw new IOException("IPC's epoch " + reqInfo.getEpoch() + + " is not the current writer epoch " + + lastWriterEpoch.get()); + } + } + + public synchronized boolean isFormatted() { + return storage.isFormatted(); + } + + private void checkFormatted() throws JournalNotFormattedException { + if (!isFormatted()) { + throw new JournalNotFormattedException("Journal " + + storage.getSingularStorageDir() + " not formatted"); + } + } + + /** + * @throws JournalOutOfSyncException if the given expression is not true. + * The message of the exception is formatted using the 'msg' and + * 'formatArgs' parameters. + */ + private void checkSync(boolean expression, String msg, + Object... formatArgs) throws JournalOutOfSyncException { + if (!expression) { + throw new JournalOutOfSyncException(String.format(msg, formatArgs)); + } + } + + /** + * @throws AssertionError if the given expression is not true. + * The message of the exception is formatted using the 'msg' and + * 'formatArgs' parameters. + * + * This should be used in preference to Java's built-in assert in + * non-performance-critical paths, where a failure of this invariant + * might cause the protocol to lose data. + */ + private void alwaysAssert(boolean expression, String msg, + Object... formatArgs) { + if (!expression) { + throw new AssertionError(String.format(msg, formatArgs)); + } + } + + /** + * Start a new segment at the given txid. The previous segment + * must have already been finalized. + */ + public synchronized void startLogSegment(RequestInfo reqInfo, long txid) + throws IOException { + assert fjm != null; + checkFormatted(); + checkRequest(reqInfo); + + if (curSegment != null) { + LOG.warn("Client is requesting a new log segment " + txid + + " though we are already writing " + curSegment + ". " + + "Aborting the current segment in order to begin the new one."); + // The writer may have lost a connection to us and is now + // re-connecting after the connection came back. + // We should abort our own old segment. + abortCurSegment(); + } + + // Paranoid sanity check: we should never overwrite a finalized log file. + // Additionally, if it's in-progress, it should have at most 1 transaction. + // This can happen if the writer crashes exactly at the start of a segment. + EditLogFile existing = fjm.getLogFile(txid); + if (existing != null) { + if (!existing.isInProgress()) { + throw new IllegalStateException("Already have a finalized segment " + + existing + " beginning at " + txid); + } + + // If it's in-progress, it should only contain one transaction, + // because the "startLogSegment" transaction is written alone at the + // start of each segment. + existing.validateLog(); + if (existing.getLastTxId() != existing.getFirstTxId()) { + throw new IllegalStateException("The log file " + + existing + " seems to contain valid transactions"); + } + } + + long curLastWriterEpoch = lastWriterEpoch.get(); + if (curLastWriterEpoch != reqInfo.getEpoch()) { + LOG.info("Updating lastWriterEpoch from " + curLastWriterEpoch + + " to " + reqInfo.getEpoch() + " for client " + + Server.getRemoteIp()); + lastWriterEpoch.set(reqInfo.getEpoch()); + } + + // The fact that we are starting a segment at this txid indicates + // that any previous recovery for this same segment was aborted. + // Otherwise, no writer would have started writing. So, we can + // remove the record of the older segment here. + purgePaxosDecision(txid); + + curSegment = fjm.startLogSegment(txid); + curSegmentTxId = txid; + nextTxId = txid; + } + + /** + * Finalize the log segment at the given transaction ID. + */ + public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId, + long endTxId) throws IOException { + checkFormatted(); + checkRequest(reqInfo); + + boolean needsValidation = true; + + // Finalizing the log that the writer was just writing. + if (startTxId == curSegmentTxId) { + if (curSegment != null) { + curSegment.close(); + curSegment = null; + curSegmentTxId = HdfsConstants.INVALID_TXID; + } + + checkSync(nextTxId == endTxId + 1, + "Trying to finalize in-progress log segment %s to end at " + + "txid %s but only written up to txid %s", + startTxId, endTxId, nextTxId - 1); + // No need to validate the edit log if the client is finalizing + // the log segment that it was just writing to. + needsValidation = false; + } + + FileJournalManager.EditLogFile elf = fjm.getLogFile(startTxId); + if (elf == null) { + throw new JournalOutOfSyncException("No log file to finalize at " + + "transaction ID " + startTxId); + } + + if (elf.isInProgress()) { + if (needsValidation) { + LOG.info("Validating log segment " + elf.getFile() + " about to be " + + "finalized"); + elf.validateLog(); + + checkSync(elf.getLastTxId() == endTxId, + "Trying to finalize in-progress log segment %s to end at " + + "txid %s but log %s on disk only contains up to txid %s", + startTxId, endTxId, elf.getFile(), elf.getLastTxId()); + } + fjm.finalizeLogSegment(startTxId, endTxId); + } else { + Preconditions.checkArgument(endTxId == elf.getLastTxId(), + "Trying to re-finalize already finalized log " + + elf + " with different endTxId " + endTxId); + } + + // Once logs are finalized, a different length will never be decided. + // During recovery, we treat a finalized segment the same as an accepted + // recovery. Thus, we no longer need to keep track of the previously- + // accepted decision. The existence of the finalized log segment is enough. + purgePaxosDecision(elf.getFirstTxId()); + } + + /** + * @see JournalManager#purgeLogsOlderThan(long) + */ + public synchronized void purgeLogsOlderThan(RequestInfo reqInfo, + long minTxIdToKeep) throws IOException { + checkFormatted(); + checkRequest(reqInfo); + + storage.purgeDataOlderThan(minTxIdToKeep); + } + + /** + * Remove the previously-recorded 'accepted recovery' information + * for a given log segment, once it is no longer necessary. + * @param segmentTxId the transaction ID to purge + * @throws IOException if the file could not be deleted + */ + private void purgePaxosDecision(long segmentTxId) throws IOException { + File paxosFile = storage.getPaxosFile(segmentTxId); + if (paxosFile.exists()) { + if (!paxosFile.delete()) { + throw new IOException("Unable to delete paxos file " + paxosFile); + } + } + } + + /** + * @see QJournalProtocol#getEditLogManifest(String, long) + */ + public RemoteEditLogManifest getEditLogManifest(long sinceTxId) + throws IOException { + // No need to checkRequest() here - anyone may ask for the list + // of segments. + checkFormatted(); + + RemoteEditLogManifest manifest = new RemoteEditLogManifest( + fjm.getRemoteEditLogs(sinceTxId)); + return manifest; + } + + /** + * @return the current state of the given segment, or null if the + * segment does not exist. + */ + private SegmentStateProto getSegmentInfo(long segmentTxId) + throws IOException { + EditLogFile elf = fjm.getLogFile(segmentTxId); + if (elf == null) { + return null; + } + if (elf.isInProgress()) { + elf.validateLog(); + } + if (elf.getLastTxId() == HdfsConstants.INVALID_TXID) { + LOG.info("Edit log file " + elf + " appears to be empty. " + + "Moving it aside..."); + elf.moveAsideEmptyFile(); + return null; + } + SegmentStateProto ret = SegmentStateProto.newBuilder() + .setStartTxId(segmentTxId) + .setEndTxId(elf.getLastTxId()) + .setIsInProgress(elf.isInProgress()) + .build(); + LOG.info("getSegmentInfo(" + segmentTxId + "): " + elf + " -> " + + TextFormat.shortDebugString(ret)); + return ret; + } + + /** + * @see QJournalProtocol#prepareRecovery(RequestInfo, long) + */ + public synchronized PrepareRecoveryResponseProto prepareRecovery( + RequestInfo reqInfo, long segmentTxId) throws IOException { + checkFormatted(); + checkRequest(reqInfo); + + abortCurSegment(); + + PrepareRecoveryResponseProto.Builder builder = + PrepareRecoveryResponseProto.newBuilder(); + + PersistedRecoveryPaxosData previouslyAccepted = getPersistedPaxosData(segmentTxId); + completeHalfDoneAcceptRecovery(previouslyAccepted); + + SegmentStateProto segInfo = getSegmentInfo(segmentTxId); + boolean hasFinalizedSegment = segInfo != null && !segInfo.getIsInProgress(); + + if (previouslyAccepted != null && !hasFinalizedSegment) { + SegmentStateProto acceptedState = previouslyAccepted.getSegmentState(); + assert acceptedState.getEndTxId() == segInfo.getEndTxId() : + "prev accepted: " + TextFormat.shortDebugString(previouslyAccepted)+ "\n" + + "on disk: " + TextFormat.shortDebugString(segInfo); + + builder.setAcceptedInEpoch(previouslyAccepted.getAcceptedInEpoch()) + .setSegmentState(previouslyAccepted.getSegmentState()); + } else { + if (segInfo != null) { + builder.setSegmentState(segInfo); + } + } + + builder.setLastWriterEpoch(lastWriterEpoch.get()); + if (committedTxnId.get() != HdfsConstants.INVALID_TXID) { + builder.setLastCommittedTxId(committedTxnId.get()); + } + + PrepareRecoveryResponseProto resp = builder.build(); + LOG.info("Prepared recovery for segment " + segmentTxId + ": " + + TextFormat.shortDebugString(resp)); + return resp; + } + + /** + * @see QJournalProtocol#acceptRecovery(RequestInfo, SegmentStateProto, URL) + */ + public synchronized void acceptRecovery(RequestInfo reqInfo, + SegmentStateProto segment, URL fromUrl) + throws IOException { + checkFormatted(); + checkRequest(reqInfo); + + abortCurSegment(); + + long segmentTxId = segment.getStartTxId(); + + // Basic sanity checks that the segment is well-formed and contains + // at least one transaction. + Preconditions.checkArgument(segment.getEndTxId() > 0 && + segment.getEndTxId() >= segmentTxId, + "bad recovery state for segment %s: %s", + segmentTxId, TextFormat.shortDebugString(segment)); + + PersistedRecoveryPaxosData oldData = getPersistedPaxosData(segmentTxId); + PersistedRecoveryPaxosData newData = PersistedRecoveryPaxosData.newBuilder() + .setAcceptedInEpoch(reqInfo.getEpoch()) + .setSegmentState(segment) + .build(); + + // If we previously acted on acceptRecovery() from a higher-numbered writer, + // this call is out of sync. We should never actually trigger this, since the + // checkRequest() call above should filter non-increasing epoch numbers. + if (oldData != null) { + alwaysAssert(oldData.getAcceptedInEpoch() <= reqInfo.getEpoch(), + "Bad paxos transition, out-of-order epochs.\nOld: %s\nNew: %s\n", + oldData, newData); + } + + File syncedFile = null; + + SegmentStateProto currentSegment = getSegmentInfo(segmentTxId); + if (currentSegment == null || + currentSegment.getEndTxId() != segment.getEndTxId()) { + if (currentSegment == null) { + LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) + + ": no current segment in place"); + + // Update the highest txid for lag metrics + highestWrittenTxId = Math.max(segment.getEndTxId(), + highestWrittenTxId); + } else { + LOG.info("Synchronizing log " + TextFormat.shortDebugString(segment) + + ": old segment " + TextFormat.shortDebugString(currentSegment) + + " is not the right length"); + + // Paranoid sanity check: if the new log is shorter than the log we + // currently have, we should not end up discarding any transactions + // which are already Committed. + if (txnRange(currentSegment).contains(committedTxnId.get()) && + !txnRange(segment).contains(committedTxnId.get())) { + throw new AssertionError( + "Cannot replace segment " + + TextFormat.shortDebugString(currentSegment) + + " with new segment " + + TextFormat.shortDebugString(segment) + + ": would discard already-committed txn " + + committedTxnId.get()); + } + + // Another paranoid check: we should not be asked to synchronize a log + // on top of a finalized segment. + alwaysAssert(currentSegment.getIsInProgress(), + "Should never be asked to synchronize a different log on top of an " + + "already-finalized segment"); + + // If we're shortening the log, update our highest txid + // used for lag metrics. + if (txnRange(currentSegment).contains(highestWrittenTxId)) { + highestWrittenTxId = segment.getEndTxId(); + } + } + syncedFile = syncLog(reqInfo, segment, fromUrl); + + } else { + LOG.info("Skipping download of log " + + TextFormat.shortDebugString(segment) + + ": already have up-to-date logs"); + } + + // This is one of the few places in the protocol where we have a single + // RPC that results in two distinct actions: + // + // - 1) Downloads the new log segment data (above) + // - 2) Records the new Paxos data about the synchronized segment (below) + // + // These need to be treated as a transaction from the perspective + // of any external process. We do this by treating the persistPaxosData() + // success as the "commit" of an atomic transaction. If we fail before + // this point, the downloaded edit log will only exist at a temporary + // path, and thus not change any externally visible state. If we fail + // after this point, then any future prepareRecovery() call will see + // the Paxos data, and by calling completeHalfDoneAcceptRecovery() will + // roll forward the rename of the referenced log file. + // + // See also: HDFS-3955 + // + // The fault points here are exercised by the randomized fault injection + // test case to ensure that this atomic "transaction" operates correctly. + JournalFaultInjector.get().beforePersistPaxosData(); + persistPaxosData(segmentTxId, newData); + JournalFaultInjector.get().afterPersistPaxosData(); + + if (syncedFile != null) { + FileUtil.replaceFile(syncedFile, + storage.getInProgressEditLog(segmentTxId)); + } + + LOG.info("Accepted recovery for segment " + segmentTxId + ": " + + TextFormat.shortDebugString(newData)); + } + + private Range txnRange(SegmentStateProto seg) { + Preconditions.checkArgument(seg.hasEndTxId(), + "invalid segment: %s", seg); + return Ranges.closed(seg.getStartTxId(), seg.getEndTxId()); + } + + /** + * Synchronize a log segment from another JournalNode. The log is + * downloaded from the provided URL into a temporary location on disk, + * which is named based on the current request's epoch. + * + * @return the temporary location of the downloaded file + */ + private File syncLog(RequestInfo reqInfo, + final SegmentStateProto segment, final URL url) throws IOException { + final File tmpFile = storage.getSyncLogTemporaryFile( + segment.getStartTxId(), reqInfo.getEpoch()); + final List localPaths = ImmutableList.of(tmpFile); + + LOG.info("Synchronizing log " + + TextFormat.shortDebugString(segment) + " from " + url); + SecurityUtil.doAsLoginUser( + new PrivilegedExceptionAction() { + @Override + public Void run() throws IOException { + boolean success = false; + try { + TransferFsImage.doGetUrl(url, localPaths, storage, true); + assert tmpFile.exists(); + success = true; + } finally { + if (!success) { + if (!tmpFile.delete()) { + LOG.warn("Failed to delete temporary file " + tmpFile); + } + } + } + return null; + } + }); + return tmpFile; + } + + + /** + * In the case the node crashes in between downloading a log segment + * and persisting the associated paxos recovery data, the log segment + * will be left in its temporary location on disk. Given the paxos data, + * we can check if this was indeed the case, and "roll forward" + * the atomic operation. + * + * See the inline comments in + * {@link #acceptRecovery(RequestInfo, SegmentStateProto, URL)} for more + * details. + * + * @throws IOException if the temporary file is unable to be renamed into + * place + */ + private void completeHalfDoneAcceptRecovery( + PersistedRecoveryPaxosData paxosData) throws IOException { + if (paxosData == null) { + return; + } + + long segmentId = paxosData.getSegmentState().getStartTxId(); + long epoch = paxosData.getAcceptedInEpoch(); + + File tmp = storage.getSyncLogTemporaryFile(segmentId, epoch); + + if (tmp.exists()) { + File dst = storage.getInProgressEditLog(segmentId); + LOG.info("Rolling forward previously half-completed synchronization: " + + tmp + " -> " + dst); + FileUtil.replaceFile(tmp, dst); + } + } + + /** + * Retrieve the persisted data for recovering the given segment from disk. + */ + private PersistedRecoveryPaxosData getPersistedPaxosData(long segmentTxId) + throws IOException { + File f = storage.getPaxosFile(segmentTxId); + if (!f.exists()) { + // Default instance has no fields filled in (they're optional) + return null; + } + + InputStream in = new FileInputStream(f); + try { + PersistedRecoveryPaxosData ret = PersistedRecoveryPaxosData.parseDelimitedFrom(in); + Preconditions.checkState(ret != null && + ret.getSegmentState().getStartTxId() == segmentTxId, + "Bad persisted data for segment %s: %s", + segmentTxId, ret); + return ret; + } finally { + IOUtils.closeStream(in); + } + } + + /** + * Persist data for recovering the given segment from disk. + */ + private void persistPaxosData(long segmentTxId, + PersistedRecoveryPaxosData newData) throws IOException { + File f = storage.getPaxosFile(segmentTxId); + boolean success = false; + AtomicFileOutputStream fos = new AtomicFileOutputStream(f); + try { + newData.writeDelimitedTo(fos); + fos.write('\n'); + // Write human-readable data after the protobuf. This is only + // to assist in debugging -- it's not parsed at all. + OutputStreamWriter writer = new OutputStreamWriter(fos); + + writer.write(String.valueOf(newData)); + writer.write('\n'); + writer.flush(); + + fos.flush(); + success = true; + } finally { + if (success) { + IOUtils.closeStream(fos); + } else { + fos.abort(); + } + } + } +} Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalFaultInjector.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalFaultInjector.java?rev=1417596&view=auto ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalFaultInjector.java (added) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalFaultInjector.java Wed Dec 5 19:22:17 2012 @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.server; + +import java.io.IOException; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Used for injecting faults in QuorumJournalManager tests. + * Calls into this are a no-op in production code. + */ +@VisibleForTesting +@InterfaceAudience.Private +public class JournalFaultInjector { + public static JournalFaultInjector instance = new JournalFaultInjector(); + + public static JournalFaultInjector get() { + return instance; + } + + public void beforePersistPaxosData() throws IOException {} + public void afterPersistPaxosData() throws IOException {} +} Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java?rev=1417596&view=auto ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java (added) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalMetrics.java Wed Dec 5 19:22:17 2012 @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.server; + +import java.io.IOException; + +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableQuantiles; + +/** + * The server-side metrics for a journal from the JournalNode's + * perspective. + */ +@Metrics(about="Journal metrics", context="dfs") +class JournalMetrics { + final MetricsRegistry registry = new MetricsRegistry("JournalNode"); + + @Metric("Number of batches written since startup") + MutableCounterLong batchesWritten; + + @Metric("Number of txns written since startup") + MutableCounterLong txnsWritten; + + @Metric("Number of bytes written since startup") + MutableCounterLong bytesWritten; + + @Metric("Number of batches written where this node was lagging") + MutableCounterLong batchesWrittenWhileLagging; + + private final int[] QUANTILE_INTERVALS = new int[] { + 1*60, // 1m + 5*60, // 5m + 60*60 // 1h + }; + + MutableQuantiles[] syncsQuantiles; + + private final Journal journal; + + JournalMetrics(Journal journal) { + this.journal = journal; + + syncsQuantiles = new MutableQuantiles[QUANTILE_INTERVALS.length]; + for (int i = 0; i < syncsQuantiles.length; i++) { + int interval = QUANTILE_INTERVALS[i]; + syncsQuantiles[i] = registry.newQuantiles( + "syncs" + interval + "s", + "Journal sync time", "ops", "latencyMicros", interval); + } + } + + public static JournalMetrics create(Journal j) { + JournalMetrics m = new JournalMetrics(j); + return DefaultMetricsSystem.instance().register( + m.getName(), null, m); + } + + String getName() { + return "Journal-" + journal.getJournalId(); + } + + @Metric("Current writer's epoch") + public long getLastWriterEpoch() { + try { + return journal.getLastWriterEpoch(); + } catch (IOException e) { + return -1L; + } + } + + @Metric("Last accepted epoch") + public long getLastPromisedEpoch() { + try { + return journal.getLastPromisedEpoch(); + } catch (IOException e) { + return -1L; + } + } + + @Metric("The highest txid stored on this JN") + public long getLastWrittenTxId() { + return journal.getHighestWrittenTxId(); + } + + @Metric("Number of transactions that this JN is lagging") + public long getCurrentLagTxns() { + try { + return journal.getCurrentLagTxns(); + } catch (IOException e) { + return -1L; + } + } + + void addSync(long us) { + for (MutableQuantiles q : syncsQuantiles) { + q.add(us); + } + } +} Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java?rev=1417596&view=auto ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java (added) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java Wed Dec 5 19:22:17 2012 @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.server; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager; +import org.apache.hadoop.hdfs.server.common.StorageErrorReporter; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.source.JvmMetrics; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +/** + * The JournalNode is a daemon which allows namenodes using + * the QuorumJournalManager to log and retrieve edits stored + * remotely. It is a thin wrapper around a local edit log + * directory with the addition of facilities to participate + * in the quorum protocol. + */ +@InterfaceAudience.Private +public class JournalNode implements Tool, Configurable { + public static final Log LOG = LogFactory.getLog(JournalNode.class); + private Configuration conf; + private JournalNodeRpcServer rpcServer; + private JournalNodeHttpServer httpServer; + private Map journalsById = Maps.newHashMap(); + + private File localDir; + + static { + HdfsConfiguration.init(); + } + + /** + * When stopped, the daemon will exit with this code. + */ + private int resultCode = 0; + + synchronized Journal getOrCreateJournal(String jid) throws IOException { + QuorumJournalManager.checkJournalId(jid); + + Journal journal = journalsById.get(jid); + if (journal == null) { + File logDir = getLogDir(jid); + LOG.info("Initializing journal in directory " + logDir); + journal = new Journal(logDir, jid, new ErrorReporter()); + journalsById.put(jid, journal); + } + + return journal; + } + + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + this.localDir = new File( + conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, + DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT).trim()); + } + + private static void validateAndCreateJournalDir(File dir) throws IOException { + if (!dir.isAbsolute()) { + throw new IllegalArgumentException( + "Journal dir '" + dir + "' should be an absolute path"); + } + + if (!dir.exists() && !dir.mkdirs()) { + throw new IOException("Could not create journal dir '" + + dir + "'"); + } else if (!dir.isDirectory()) { + throw new IOException("Journal directory '" + dir + "' is not " + + "a directory"); + } + + if (!dir.canWrite()) { + throw new IOException("Unable to write to journal dir '" + + dir + "'"); + } + } + + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public int run(String[] args) throws Exception { + start(); + return join(); + } + + /** + * Start listening for edits via RPC. + */ + public void start() throws IOException { + Preconditions.checkState(!isStarted(), "JN already running"); + + validateAndCreateJournalDir(localDir); + + DefaultMetricsSystem.initialize("JournalNode"); + JvmMetrics.create("JournalNode", + conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY), + DefaultMetricsSystem.instance()); + + InetSocketAddress socAddr = JournalNodeRpcServer.getAddress(conf); + SecurityUtil.login(conf, DFSConfigKeys.DFS_JOURNALNODE_KEYTAB_FILE_KEY, + DFSConfigKeys.DFS_JOURNALNODE_USER_NAME_KEY, socAddr.getHostName()); + + httpServer = new JournalNodeHttpServer(conf, this); + httpServer.start(); + + rpcServer = new JournalNodeRpcServer(conf, this); + rpcServer.start(); + } + + public boolean isStarted() { + return rpcServer != null; + } + + /** + * @return the address the IPC server is bound to + */ + public InetSocketAddress getBoundIpcAddress() { + return rpcServer.getAddress(); + } + + + public InetSocketAddress getBoundHttpAddress() { + return httpServer.getAddress(); + } + + + /** + * Stop the daemon with the given status code + * @param rc the status code with which to exit (non-zero + * should indicate an error) + */ + public void stop(int rc) { + this.resultCode = rc; + + if (rpcServer != null) { + rpcServer.stop(); + } + + if (httpServer != null) { + try { + httpServer.stop(); + } catch (IOException ioe) { + LOG.warn("Unable to stop HTTP server for " + this, ioe); + } + } + + for (Journal j : journalsById.values()) { + IOUtils.cleanup(LOG, j); + } + } + + /** + * Wait for the daemon to exit. + * @return the result code (non-zero if error) + */ + int join() throws InterruptedException { + if (rpcServer != null) { + rpcServer.join(); + } + return resultCode; + } + + public void stopAndJoin(int rc) throws InterruptedException { + stop(rc); + join(); + } + + /** + * Return the directory inside our configured storage + * dir which corresponds to a given journal. + * @param jid the journal identifier + * @return the file, which may or may not exist yet + */ + private File getLogDir(String jid) { + String dir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, + DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT); + Preconditions.checkArgument(jid != null && + !jid.isEmpty(), + "bad journal identifier: %s", jid); + return new File(new File(dir), jid); + } + + + private class ErrorReporter implements StorageErrorReporter { + @Override + public void reportErrorOnFile(File f) { + LOG.fatal("Error reported on file " + f + "... exiting", + new Exception()); + stop(1); + } + } + + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(new JournalNode(), args)); + } +} Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java?rev=1417596&view=auto ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java (added) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java Wed Dec 5 19:22:17 2012 @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.server; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_JOURNALNODE_INTERNAL_SPNEGO_USER_NAME_KEY; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import javax.servlet.ServletContext; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.common.JspHelper; +import org.apache.hadoop.http.HttpServer; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.SecurityUtil; + +/** + * Encapsulates the HTTP server started by the Journal Service. + */ +@InterfaceAudience.Private +public class JournalNodeHttpServer { + public static final Log LOG = LogFactory.getLog( + JournalNodeHttpServer.class); + + public static final String JN_ATTRIBUTE_KEY = "localjournal"; + + private HttpServer httpServer; + private int infoPort; + private JournalNode localJournalNode; + + private final Configuration conf; + + JournalNodeHttpServer(Configuration conf, JournalNode jn) { + this.conf = conf; + this.localJournalNode = jn; + } + + void start() throws IOException { + final InetSocketAddress bindAddr = getAddress(conf); + + // initialize the webserver for uploading/downloading files. + LOG.info("Starting web server as: "+ SecurityUtil.getServerPrincipal(conf + .get(DFS_JOURNALNODE_INTERNAL_SPNEGO_USER_NAME_KEY), + bindAddr.getHostName())); + + int tmpInfoPort = bindAddr.getPort(); + httpServer = new HttpServer("journal", bindAddr.getHostName(), + tmpInfoPort, tmpInfoPort == 0, conf, new AccessControlList(conf + .get(DFS_ADMIN, " "))) { + { + if (UserGroupInformation.isSecurityEnabled()) { + initSpnego(conf, DFS_JOURNALNODE_INTERNAL_SPNEGO_USER_NAME_KEY, + DFS_JOURNALNODE_KEYTAB_FILE_KEY); + } + } + }; + httpServer.setAttribute(JN_ATTRIBUTE_KEY, localJournalNode); + httpServer.setAttribute(JspHelper.CURRENT_CONF, conf); + httpServer.addInternalServlet("getJournal", "/getJournal", + GetJournalEditServlet.class, true); + httpServer.start(); + + // The web-server port can be ephemeral... ensure we have the correct info + infoPort = httpServer.getPort(); + + LOG.info("Journal Web-server up at: " + bindAddr + ":" + infoPort); + } + + void stop() throws IOException { + if (httpServer != null) { + try { + httpServer.stop(); + } catch (Exception e) { + throw new IOException(e); + } + } + } + + /** + * Return the actual address bound to by the running server. + */ + public InetSocketAddress getAddress() { + InetSocketAddress addr = httpServer.getListenerAddress(); + assert addr.getPort() != 0; + return addr; + } + + private static InetSocketAddress getAddress(Configuration conf) { + String addr = conf.get(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, + DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_DEFAULT); + return NetUtils.createSocketAddr(addr, + DFSConfigKeys.DFS_JOURNALNODE_HTTP_PORT_DEFAULT, + DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY); + } + + public static Journal getJournalFromContext(ServletContext context, String jid) + throws IOException { + JournalNode jn = (JournalNode)context.getAttribute(JN_ATTRIBUTE_KEY); + return jn.getOrCreateJournal(jid); + } + + public static Configuration getConfFromContext(ServletContext context) { + return (Configuration) context.getAttribute(JspHelper.CURRENT_CONF); + } +} Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java?rev=1417596&view=auto ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java (added) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java Wed Dec 5 19:22:17 2012 @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.server; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URL; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HDFSPolicyProvider; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.QJournalProtocolService; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; +import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo; +import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB; +import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hadoop.net.NetUtils; + +import com.google.protobuf.BlockingService; + +class JournalNodeRpcServer implements QJournalProtocol { + + private static final int HANDLER_COUNT = 5; + private JournalNode jn; + private Server server; + + JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException { + this.jn = jn; + + Configuration confCopy = new Configuration(conf); + + // Ensure that nagling doesn't kick in, which could cause latency issues. + confCopy.setBoolean( + CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY, + true); + + InetSocketAddress addr = getAddress(confCopy); + RPC.setProtocolEngine(confCopy, QJournalProtocolPB.class, + ProtobufRpcEngine.class); + QJournalProtocolServerSideTranslatorPB translator = + new QJournalProtocolServerSideTranslatorPB(this); + BlockingService service = QJournalProtocolService + .newReflectiveBlockingService(translator); + + this.server = new RPC.Builder(confCopy) + .setProtocol(QJournalProtocolPB.class) + .setInstance(service) + .setBindAddress(addr.getHostName()) + .setPort(addr.getPort()) + .setNumHandlers(HANDLER_COUNT) + .setVerbose(false) + .build(); + + // set service-level authorization security policy + if (confCopy.getBoolean( + CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) { + server.refreshServiceAcl(confCopy, new HDFSPolicyProvider()); + } + } + + void start() { + this.server.start(); + } + + public InetSocketAddress getAddress() { + return server.getListenerAddress(); + } + + void join() throws InterruptedException { + this.server.join(); + } + + void stop() { + this.server.stop(); + } + + static InetSocketAddress getAddress(Configuration conf) { + String addr = conf.get( + DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, + DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_DEFAULT); + return NetUtils.createSocketAddr(addr, 0, + DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY); + } + + @Override + public boolean isFormatted(String journalId) throws IOException { + return jn.getOrCreateJournal(journalId).isFormatted(); + } + + @Override + public GetJournalStateResponseProto getJournalState(String journalId) + throws IOException { + long epoch = jn.getOrCreateJournal(journalId).getLastPromisedEpoch(); + return GetJournalStateResponseProto.newBuilder() + .setLastPromisedEpoch(epoch) + .setHttpPort(jn.getBoundHttpAddress().getPort()) + .build(); + } + + @Override + public NewEpochResponseProto newEpoch(String journalId, + NamespaceInfo nsInfo, + long epoch) throws IOException { + return jn.getOrCreateJournal(journalId).newEpoch(nsInfo, epoch); + } + + @Override + public void format(String journalId, NamespaceInfo nsInfo) + throws IOException { + jn.getOrCreateJournal(journalId).format(nsInfo); + } + + @Override + public void journal(RequestInfo reqInfo, + long segmentTxId, long firstTxnId, + int numTxns, byte[] records) throws IOException { + jn.getOrCreateJournal(reqInfo.getJournalId()) + .journal(reqInfo, segmentTxId, firstTxnId, numTxns, records); + } + + @Override + public void heartbeat(RequestInfo reqInfo) throws IOException { + jn.getOrCreateJournal(reqInfo.getJournalId()) + .heartbeat(reqInfo); + } + + @Override + public void startLogSegment(RequestInfo reqInfo, long txid) + throws IOException { + jn.getOrCreateJournal(reqInfo.getJournalId()) + .startLogSegment(reqInfo, txid); + } + + @Override + public void finalizeLogSegment(RequestInfo reqInfo, long startTxId, + long endTxId) throws IOException { + jn.getOrCreateJournal(reqInfo.getJournalId()) + .finalizeLogSegment(reqInfo, startTxId, endTxId); + } + + @Override + public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep) + throws IOException { + jn.getOrCreateJournal(reqInfo.getJournalId()) + .purgeLogsOlderThan(reqInfo, minTxIdToKeep); + } + + @Override + public GetEditLogManifestResponseProto getEditLogManifest(String jid, + long sinceTxId) throws IOException { + + RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid) + .getEditLogManifest(sinceTxId); + + return GetEditLogManifestResponseProto.newBuilder() + .setManifest(PBHelper.convert(manifest)) + .setHttpPort(jn.getBoundHttpAddress().getPort()) + .build(); + } + + @Override + public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo, + long segmentTxId) throws IOException { + return jn.getOrCreateJournal(reqInfo.getJournalId()) + .prepareRecovery(reqInfo, segmentTxId); + } + + @Override + public void acceptRecovery(RequestInfo reqInfo, SegmentStateProto log, + URL fromUrl) throws IOException { + jn.getOrCreateJournal(reqInfo.getJournalId()) + .acceptRecovery(reqInfo, log, fromUrl); + } + +} Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java?rev=1417596&r1=1417595&r2=1417596&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java Wed Dec 5 19:22:17 2012 @@ -39,7 +39,8 @@ public final class HdfsServerConstants { */ static public enum NodeType { NAME_NODE, - DATA_NODE; + DATA_NODE, + JOURNAL_NODE; } /** Startup options */ Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1417596&r1=1417595&r2=1417596&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java Wed Dec 5 19:22:17 2012 @@ -42,6 +42,8 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.VersionInfo; +import com.google.common.base.Preconditions; + /** @@ -76,7 +78,7 @@ public abstract class Storage extends St /** Layout versions of 0.20.203 release */ public static final int[] LAYOUT_VERSIONS_203 = {-19, -31}; - private static final String STORAGE_FILE_LOCK = "in_use.lock"; + public static final String STORAGE_FILE_LOCK = "in_use.lock"; protected static final String STORAGE_FILE_VERSION = "VERSION"; public static final String STORAGE_DIR_CURRENT = "current"; public static final String STORAGE_DIR_PREVIOUS = "previous"; @@ -752,6 +754,15 @@ public abstract class Storage extends St return storageDirs.get(idx); } + /** + * @return the storage directory, with the precondition that this storage + * has exactly one storage directory + */ + public StorageDirectory getSingularStorageDir() { + Preconditions.checkState(storageDirs.size() == 1); + return storageDirs.get(0); + } + protected void addStorageDir(StorageDirectory sd) { storageDirs.add(sd); } Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1417596&r1=1417595&r2=1417596&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original) +++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Wed Dec 5 19:22:17 2012 @@ -114,7 +114,7 @@ class EditLogBackupOutputStream extends } @Override // EditLogOutputStream - protected void flushAndSync() throws IOException { + protected void flushAndSync(boolean durable) throws IOException { assert out.getLength() == 0 : "Output buffer is not empty"; int numReadyTxns = doubleBuf.countReadyTxns();