Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 45868CF5F for ; Fri, 20 Jul 2012 00:26:19 +0000 (UTC) Received: (qmail 39559 invoked by uid 500); 20 Jul 2012 00:26:19 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 39520 invoked by uid 500); 20 Jul 2012 00:26:19 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 39511 invoked by uid 99); 20 Jul 2012 00:26:19 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Jul 2012 00:26:19 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Jul 2012 00:26:14 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 771A423889E7; Fri, 20 Jul 2012 00:25:55 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1363596 [3/3] - in /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/main/bin/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/main/java/org/apache/hadoop/hdfs... Date: Fri, 20 Jul 2012 00:25:52 -0000 To: hdfs-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120720002555.771A423889E7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java?rev=1363596&r1=1363595&r2=1363596&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java Fri Jul 20 00:25:50 2012 @@ -17,18 +17,15 @@ */ package org.apache.hadoop.hdfs.server.protocol; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.io.Writable; import com.google.common.base.Function; import com.google.common.collect.ComparisonChain; -public class RemoteEditLog implements Writable, Comparable { +public class RemoteEditLog implements Comparable { private long startTxId = HdfsConstants.INVALID_TXID; private long endTxId = HdfsConstants.INVALID_TXID; + private boolean isInProgress = false; public RemoteEditLog() { } @@ -36,6 +33,13 @@ public class RemoteEditLog implements Wr public RemoteEditLog(long startTxId, long endTxId) { this.startTxId = startTxId; this.endTxId = endTxId; + this.isInProgress = (endTxId == HdfsConstants.INVALID_TXID); + } + + public RemoteEditLog(long startTxId, long endTxId, boolean inProgress) { + this.startTxId = startTxId; + this.endTxId = endTxId; + this.isInProgress = inProgress; } public long getStartTxId() { @@ -45,22 +49,18 @@ public class RemoteEditLog implements Wr public long getEndTxId() { return endTxId; } - - @Override - public String toString() { - return "[" + startTxId + "," + endTxId + "]"; - } - @Override - public void write(DataOutput out) throws IOException { - out.writeLong(startTxId); - out.writeLong(endTxId); + public boolean isInProgress() { + return isInProgress; } @Override - public void readFields(DataInput in) throws IOException { - startTxId = in.readLong(); - endTxId = in.readLong(); + public String toString() { + if (!isInProgress) { + return "[" + startTxId + "," + endTxId + "]"; + } else { + return "[" + startTxId + "-? (in-progress)]"; + } } @Override Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto?rev=1363596&view=auto ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto (added) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto Fri Jul 20 00:25:50 2012 @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.hdfs.qjournal.protocol"; +option java_outer_classname = "QJournalProtocolProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + +import "hdfs.proto"; + +message JournalIdProto { + required string identifier = 1; +} + +message RequestInfoProto { + required JournalIdProto journalId = 1; + required uint64 epoch = 2; + required uint64 ipcSerialNumber = 3; +} + +message SegmentStateProto { + required uint64 startTxId = 1; + required uint64 endTxId = 2; + required bool isInProgress = 3; + required bytes md5sum = 4; +} + +/** + * The storage format used on local disk for previously + * accepted decisions. + */ +message PersistedRecoveryPaxosData { + required SegmentStateProto segmentState = 1; + required uint64 acceptedInEpoch = 2; +} + +/** + * journal() + */ + +message JournalRequestProto { + required RequestInfoProto reqInfo = 1; + required uint64 firstTxnId = 2; + required uint32 numTxns = 3; + required bytes records = 4; +} + +message JournalResponseProto { +} + +/** + * startLogSegment() + */ +message StartLogSegmentRequestProto { + required RequestInfoProto reqInfo = 1; + required uint64 txid = 2; // Transaction ID +} + +message StartLogSegmentResponseProto { +} + +/** + * finalizeLogSegment() + */ +message FinalizeLogSegmentRequestProto { + required RequestInfoProto reqInfo = 1; + required uint64 startTxId = 2; + required uint64 endTxId = 3; +} + +message FinalizeLogSegmentResponseProto { +} + +/** + * getJournalState() + */ +message GetJournalStateRequestProto { + required JournalIdProto jid = 1; +} + +message GetJournalStateResponseProto { + required uint64 lastPromisedEpoch = 1; + required uint32 httpPort = 2; +} + +/** + * newEpoch() + */ +message NewEpochRequestProto { + required JournalIdProto jid = 1; + required NamespaceInfoProto nsInfo = 2; + required uint64 epoch = 3; +} + +message NewEpochResponseProto { + optional uint64 lastSegmentTxId = 1; +} + +/** + * getEditLogManifest() + */ +message GetEditLogManifestRequestProto { + required JournalIdProto jid = 1; + required uint64 sinceTxId = 2; // Transaction ID +} + +message GetEditLogManifestResponseProto { + required RemoteEditLogManifestProto manifest = 1; + required uint32 httpPort = 2; + + // TODO: we should add nsinfo somewhere + // to verify that it matches up with our expectation + // required NamespaceInfoProto nsInfo = 2; +} + +/** + * prepareRecovery() + */ +message PrepareRecoveryRequestProto { + required RequestInfoProto reqInfo = 1; + required uint64 segmentTxId = 2; +} + +message PrepareRecoveryResponseProto { + optional SegmentStateProto segmentState = 1; + optional uint64 acceptedInEpoch = 2; +} + +/** + * acceptRecovery() + */ +message AcceptRecoveryRequestProto { + required RequestInfoProto reqInfo = 1; + + /** Details on the segment to recover */ + required SegmentStateProto stateToAccept = 2; + + /** The URL from which the log may be copied */ + required string fromURL = 3; +} + +message AcceptRecoveryResponseProto { +} + + +/** + * Protocol used to journal edits to a JournalNode. + * See the request and response for details of rpc call. + */ +service QJournalProtocolService { + rpc getJournalState(GetJournalStateRequestProto) returns (GetJournalStateResponseProto); + + rpc newEpoch(NewEpochRequestProto) returns (NewEpochResponseProto); + + rpc journal(JournalRequestProto) returns (JournalResponseProto); + + rpc startLogSegment(StartLogSegmentRequestProto) + returns (StartLogSegmentResponseProto); + + rpc finalizeLogSegment(FinalizeLogSegmentRequestProto) + returns (FinalizeLogSegmentResponseProto); + + rpc getEditLogManifest(GetEditLogManifestRequestProto) + returns (GetEditLogManifestResponseProto); + + rpc prepareRecovery(PrepareRecoveryRequestProto) + returns (PrepareRecoveryResponseProto); + + rpc acceptRecovery(AcceptRecoveryRequestProto) + returns (AcceptRecoveryResponseProto); +} Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto?rev=1363596&r1=1363595&r2=1363596&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto Fri Jul 20 00:25:50 2012 @@ -290,6 +290,7 @@ message BlocksWithLocationsProto { message RemoteEditLogProto { required uint64 startTxId = 1; // Starting available edit log transaction required uint64 endTxId = 2; // Ending available edit log transaction + optional bool isInProgress = 3 [default = false]; } /** Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1363596&r1=1363595&r2=1363596&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Fri Jul 20 00:25:50 2012 @@ -241,6 +241,11 @@ + dfs.namenode.edits.journal-plugin.qjournal + org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager + + + dfs.permissions.enabled true Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html?rev=1363596&view=auto ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html (added) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/index.html Fri Jul 20 00:25:50 2012 @@ -0,0 +1,29 @@ + + + +Hadoop Administration + + +

Hadoop Administration

+ + + + + Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp?rev=1363596&view=auto ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp (added) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/journal/journalstatus.jsp Fri Jul 20 00:25:50 2012 @@ -0,0 +1,42 @@ +<% +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +%> +<%@ page + contentType="text/html; charset=UTF-8" + import="org.apache.hadoop.hdfs.server.common.JspHelper" + import="org.apache.hadoop.util.ServletUtil" +%> +<%! + //for java.io.Serializable + private static final long serialVersionUID = 1L; +%> + + + + +Hadoop JournalNode + + +

JournalNode

+<%= JspHelper.getVersionTable() %> +
+ +
+Logs +<%= ServletUtil.htmlFooter() %> Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml?rev=1363596&view=auto ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml (added) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/proto-journal-web.xml Fri Jul 20 00:25:50 2012 @@ -0,0 +1,17 @@ + + + +@journal.servlet.definitions@ + Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1363596&r1=1363595&r2=1363596&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Fri Jul 20 00:25:50 2012 @@ -85,6 +85,7 @@ import org.apache.hadoop.security.UserGr import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.VersionInfo; +import com.google.common.base.Charsets; import com.google.common.base.Joiner; /** Utilities for HDFS tests */ @@ -586,12 +587,21 @@ public class DFSTestUtil { IOUtils.copyBytes(is, os, s.length(), true); } - // Returns url content as string. + /** + * @return url content as string (UTF-8 encoding assumed) + */ public static String urlGet(URL url) throws IOException { + return new String(urlGetBytes(url), Charsets.UTF_8); + } + + /** + * @return URL contents as a byte array + */ + public static byte[] urlGetBytes(URL url) throws IOException { URLConnection conn = url.openConnection(); ByteArrayOutputStream out = new ByteArrayOutputStream(); IOUtils.copyBytes(conn.getInputStream(), out, 4096, true); - return out.toString(); + return out.toByteArray(); } /** Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java?rev=1363596&view=auto ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java (added) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java Fri Jul 20 00:25:50 2012 @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.qjournal.server.JournalNode; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; + +public class MiniJournalCluster { + public static class Builder { + private String baseDir; + private int numJournalNodes = 3; + private boolean format = true; + private Configuration conf; + + public Builder(Configuration conf) { + this.conf = conf; + } + + public Builder baseDir(String d) { + this.baseDir = d; + return this; + } + + public Builder numJournalNodes(int n) { + this.numJournalNodes = n; + return this; + } + + public Builder format(boolean f) { + this.format = f; + return this; + } + + public MiniJournalCluster build() throws IOException { + return new MiniJournalCluster(this); + } + } + + private static final Log LOG = LogFactory.getLog(MiniJournalCluster.class); + private File baseDir; + private JournalNode nodes[]; + private InetSocketAddress ipcAddrs[]; + private InetSocketAddress httpAddrs[]; + + private MiniJournalCluster(Builder b) throws IOException { + LOG.info("Starting MiniJournalCluster with " + + b.numJournalNodes + " journal nodes"); + + if (b.baseDir != null) { + this.baseDir = new File(b.baseDir); + } else { + this.baseDir = new File(MiniDFSCluster.getBaseDirectory()); + } + + nodes = new JournalNode[b.numJournalNodes]; + ipcAddrs = new InetSocketAddress[b.numJournalNodes]; + httpAddrs = new InetSocketAddress[b.numJournalNodes]; + for (int i = 0; i < b.numJournalNodes; i++) { + if (b.format) { + File dir = getStorageDir(i); + LOG.debug("Fully deleting JN directory " + dir); + FileUtil.fullyDelete(dir); + } + nodes[i] = new JournalNode(); + nodes[i].setConf(createConfForNode(b, i)); + nodes[i].start(); + + ipcAddrs[i] = nodes[i].getBoundIpcAddress(); + httpAddrs[i] = nodes[i].getBoundHttpAddress(); + } + } + + /** + * Set up the given Configuration object to point to the set of JournalNodes + * in this cluster. + */ + public URI getQuorumJournalURI(String jid) { + List addrs = Lists.newArrayList(); + for (InetSocketAddress addr : ipcAddrs) { + addrs.add("127.0.0.1:" + addr.getPort()); + } + String addrsVal = Joiner.on(";").join(addrs); + LOG.debug("Setting logger addresses to: " + addrsVal); + try { + return new URI("qjournal://" + addrsVal + "/" + jid); + } catch (URISyntaxException e) { + throw new AssertionError(e); + } + } + + /** + * Start the JournalNodes in the cluster. + */ + public void start() throws IOException { + for (JournalNode jn : nodes) { + jn.start(); + } + } + + /** + * Shutdown all of the JournalNodes in the cluster. + * @throws IOException if one or more nodes failed to stop + */ + public void shutdown() throws IOException { + boolean failed = false; + for (JournalNode jn : nodes) { + try { + jn.stopAndJoin(0); + } catch (Exception e) { + failed = true; + LOG.warn("Unable to stop journal node " + jn, e); + } + } + if (failed) { + throw new IOException("Unable to shut down. Check log for details"); + } + } + + private Configuration createConfForNode(Builder b, int idx) { + Configuration conf = new Configuration(b.conf); + File logDir = getStorageDir(idx); + conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY, logDir.toString()); + conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0"); + conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); + return conf; + } + + public File getStorageDir(int idx) { + return new File(baseDir, "journalnode-" + idx); + } + + public File getCurrentDir(int idx, String jid) { + return new File(new File(getStorageDir(idx), jid), "current"); + } + + public JournalNode getJournalNode(int i) { + return nodes[i]; + } + + public void restartJournalNode(int i) throws InterruptedException, IOException { + Configuration conf = new Configuration(nodes[i].getConf()); + if (nodes[i].isStarted()) { + nodes[i].stopAndJoin(0); + } + + conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "127.0.0.1:" + + ipcAddrs[i].getPort()); + conf.set(DFSConfigKeys.DFS_JOURNALNODE_HTTP_ADDRESS_KEY, "127.0.0.1:" + + httpAddrs[i].getPort()); + + JournalNode jn = new JournalNode(); + jn.setConf(conf); + jn.start(); + } + + public int getQuorumSize() { + return nodes.length / 2 + 1; + } + + public int getNumNodes() { + return nodes.length; + } + +} Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java?rev=1363596&view=auto ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java (added) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/QJMTestUtil.java Fri Jul 20 00:25:50 2012 @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal; + +import java.util.Arrays; + +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.io.DataOutputBuffer; + +public abstract class QJMTestUtil { + + public static byte[] createTxnData(int startTxn, int numTxns) throws Exception { + DataOutputBuffer buf = new DataOutputBuffer(); + FSEditLogOp.Writer writer = new FSEditLogOp.Writer(buf); + + for (long txid = startTxn; txid < startTxn + numTxns; txid++) { + FSEditLogOp op = NameNodeAdapter.createMkdirOp("tx " + txid); + op.setTransactionId(txid); + writer.writeOp(op); + } + + return Arrays.copyOf(buf.getData(), buf.getLength()); + } + +} Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java?rev=1363596&view=auto ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java (added) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestMiniJournalCluster.java Fri Jul 20 00:25:50 2012 @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.qjournal.server.JournalNode; +import org.junit.Test; + + +public class TestMiniJournalCluster { + @Test + public void testStartStop() throws IOException { + Configuration conf = new Configuration(); + MiniJournalCluster c = new MiniJournalCluster.Builder(conf) + .build(); + try { + URI uri = c.getQuorumJournalURI("myjournal"); + String[] addrs = uri.getAuthority().split(";"); + assertEquals(3, addrs.length); + + JournalNode node = c.getJournalNode(0); + String dir = node.getConf().get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY); + assertEquals(MiniDFSCluster.getBaseDirectory() + "journalnode-0", + dir); + } finally { + c.shutdown(); + } + } +} Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java?rev=1363596&view=auto ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java (added) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java Fri Jul 20 00:25:50 2012 @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.ExitUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestNNWithQJM { + Configuration conf = new HdfsConfiguration(); + private MiniJournalCluster mjc; + private Path TEST_PATH = new Path("/test-dir"); + private Path TEST_PATH_2 = new Path("/test-dir"); + + @Before + public void resetSystemExit() { + ExitUtil.resetFirstExitException(); + } + + @Before + public void startJNs() throws Exception { + mjc = new MiniJournalCluster.Builder(conf).build(); + } + + @After + public void stopJNs() throws Exception { + if (mjc != null) { + mjc.shutdown(); + } + } + + @Test + public void testLogAndRestart() throws IOException { + conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, + MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image"); + conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, + mjc.getQuorumJournalURI("myjournal").toString()); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(0) + .manageNameDfsDirs(false) + .build(); + try { + cluster.getFileSystem().mkdirs(TEST_PATH); + + // Restart the NN and make sure the edit was persisted + // and loaded again + cluster.restartNameNode(); + + assertTrue(cluster.getFileSystem().exists(TEST_PATH)); + cluster.getFileSystem().mkdirs(TEST_PATH_2); + + // Restart the NN again and make sure both edits are persisted. + cluster.restartNameNode(); + assertTrue(cluster.getFileSystem().exists(TEST_PATH)); + assertTrue(cluster.getFileSystem().exists(TEST_PATH_2)); + } finally { + cluster.shutdown(); + } + } + + @Test + public void testNewNamenodeTakesOverWriter() throws Exception { + File nn1Dir = new File( + MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image-nn1"); + File nn2Dir = new File( + MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image-nn2"); + + conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, + nn1Dir.getAbsolutePath()); + conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, + mjc.getQuorumJournalURI("myjournal").toString()); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(0) + .manageNameDfsDirs(false) + .checkExitOnShutdown(false) + .build(); + + try { + cluster.getFileSystem().mkdirs(TEST_PATH); + + // Start a second NN pointed to the same quorum. + // We need to copy the image dir from the first NN -- or else + // the new NN will just be rejected because of Namespace mismatch. + FileUtil.fullyDelete(nn2Dir); + FileUtil.copy(nn1Dir, FileSystem.getLocal(conf).getRaw(), + new Path(nn2Dir.getAbsolutePath()), false, conf); + + Configuration conf2 = new Configuration(); + conf2.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, + nn2Dir.getAbsolutePath()); + conf2.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, + mjc.getQuorumJournalURI("myjournal").toString()); + MiniDFSCluster cluster2 = new MiniDFSCluster.Builder(conf2) + .numDataNodes(0) + .format(false) + .manageNameDfsDirs(false) + .build(); + + // Check that the new cluster sees the edits made on the old cluster + try { + assertTrue(cluster2.getFileSystem().exists(TEST_PATH)); + } finally { + cluster2.shutdown(); + } + + // Check that, if we try to write to the old NN + // that it aborts. + try { + cluster.getFileSystem().mkdirs(new Path("/x")); + fail("Did not abort trying to write to a fenced NN"); + } catch (RemoteException re) { + GenericTestUtils.assertExceptionContains( + "Could not sync enough journals to persistent storage", re); + } + } finally { + //cluster.shutdown(); + } + } + + @Test + public void testMismatchedNNIsRejected() throws Exception { + conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, + MiniDFSCluster.getBaseDirectory() + "/TestNNWithQJM/image"); + conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, + mjc.getQuorumJournalURI("myjournal").toString()); + + // Start a NN, so the storage is formatted with its namespace info. + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(0) + .manageNameDfsDirs(false) + .build(); + cluster.shutdown(); + + // Create a new (freshly-formatted) NN, which should not be able to + // reuse the same journal, since its journal ID would not match. + try { + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(0) + .manageNameDfsDirs(false) + .build(); + fail("New NN with different namespace should have been rejected"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains( + "Unable to start log segment 1: too few journals", ioe); + } + } +} Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java?rev=1363596&view=auto ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java (added) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java Fri Jul 20 00:25:50 2012 @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.client; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; +import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster.Builder; +import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger; +import org.apache.hadoop.hdfs.qjournal.client.AsyncLoggerSet; +import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + + +public class TestEpochsAreUnique { + private static final Log LOG = LogFactory.getLog(TestEpochsAreUnique.class); + private static final String JID = "testEpochsAreUnique-jid"; + private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo( + 12345, "mycluster", "my-bp", 0L, 0); + private Random r = new Random(); + + @Test + public void testSingleThreaded() throws IOException { + Configuration conf = new Configuration(); + MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build(); + URI uri = cluster.getQuorumJournalURI(JID); + try { + // With no failures or contention, epochs should increase one-by-one + for (int i = 0; i < 5; i++) { + AsyncLoggerSet als = new AsyncLoggerSet( + QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO)); + als.createNewUniqueEpoch(FAKE_NSINFO); + assertEquals(i + 1, als.getEpoch()); + } + + long prevEpoch = 5; + // With some failures injected, it should still always increase, perhaps + // skipping some + for (int i = 0; i < 20; i++) { + AsyncLoggerSet als = new AsyncLoggerSet( + makeFaulty(QuorumJournalManager.createLoggers(conf, uri, FAKE_NSINFO))); + long newEpoch = -1; + while (true) { + try { + als.createNewUniqueEpoch(FAKE_NSINFO); + newEpoch = als.getEpoch(); + break; + } catch (IOException ioe) { + // It's OK to fail to create an epoch, since we randomly inject + // faults. It's possible we'll inject faults in too many of the + // underlying nodes, and a failure is expected in that case + } + } + LOG.info("Created epoch " + newEpoch); + assertTrue("New epoch " + newEpoch + " should be greater than previous " + + prevEpoch, newEpoch > prevEpoch); + prevEpoch = newEpoch; + } + } finally { + cluster.shutdown(); + } + } + + + private List makeFaulty(List loggers) { + List ret = Lists.newArrayList(); + for (AsyncLogger l : loggers) { + AsyncLogger spy = Mockito.spy(l); + Mockito.doAnswer(new SometimesFaulty(0.10f)) + .when(spy).getJournalState(); + Mockito.doAnswer(new SometimesFaulty(0.40f)) + .when(spy).newEpoch(Mockito.anyLong()); + ret.add(spy); + } + return ret; + } + + private class SometimesFaulty implements Answer> { + private float faultProbability; + + public SometimesFaulty(float faultProbability) { + this.faultProbability = faultProbability; + } + + @SuppressWarnings("unchecked") + @Override + public ListenableFuture answer(InvocationOnMock invocation) + throws Throwable { + if (r.nextFloat() < faultProbability) { + return Futures.immediateFailedFuture( + new IOException("Injected fault")); + } + return (ListenableFuture)invocation.callRealMethod(); + } + } + + + +} Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java?rev=1363596&view=auto ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java (added) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestIPCLoggerChannel.java Fri Jul 20 00:25:50 2012 @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.client; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel; +import org.apache.hadoop.hdfs.qjournal.client.LoggerTooFarBehindException; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; +import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.GenericTestUtils.DelayAnswer; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import com.google.common.base.Supplier; + +public class TestIPCLoggerChannel { + private static final Log LOG = LogFactory.getLog( + TestIPCLoggerChannel.class); + + private Configuration conf = new Configuration(); + private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo( + 12345, "mycluster", "my-bp", 0L, 0); + private static final String JID = "test-journalid"; + private static final InetSocketAddress FAKE_ADDR = + new InetSocketAddress(0); + private static final byte[] FAKE_DATA = new byte[4096]; + + private QJournalProtocol mockProxy = Mockito.mock(QJournalProtocol.class); + private IPCLoggerChannel ch; + + private static final int LIMIT_QUEUE_SIZE_MB = 1; + private static final int LIMIT_QUEUE_SIZE_BYTES = + LIMIT_QUEUE_SIZE_MB * 1024 * 1024; + + @Before + public void setupMock() { + conf.setInt(DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY, + LIMIT_QUEUE_SIZE_MB); + + // Channel to the mock object instead of a real IPC proxy. + ch = new IPCLoggerChannel(conf, FAKE_NSINFO, JID, FAKE_ADDR) { + @Override + protected QJournalProtocol getProxy() throws IOException { + return mockProxy; + } + }; + + ch.setEpoch(1); + } + + @Test + public void testSimpleCall() throws Exception { + ch.sendEdits(1, 3, FAKE_DATA).get(); + Mockito.verify(mockProxy).journal(Mockito.any(), + Mockito.eq(1L), Mockito.eq(3), Mockito.same(FAKE_DATA)); + } + + + /** + * Test that, once the queue eclipses the configure size limit, + * calls to journal more data are rejected. + */ + @Test + public void testQueueLimiting() throws Exception { + + // Block the underlying fake proxy from actually completing any calls. + DelayAnswer delayer = new DelayAnswer(LOG); + Mockito.doAnswer(delayer).when(mockProxy).journal( + Mockito.any(), + Mockito.eq(1L), Mockito.eq(1), Mockito.same(FAKE_DATA)); + + // Queue up the maximum number of calls. + int numToQueue = LIMIT_QUEUE_SIZE_BYTES / FAKE_DATA.length; + for (int i = 1; i <= numToQueue; i++) { + ch.sendEdits((long)i, 1, FAKE_DATA); + } + + // The accounting should show the correct total number queued. + assertEquals(LIMIT_QUEUE_SIZE_BYTES, ch.getQueuedEditsSize()); + + // Trying to queue any more should fail. + try { + ch.sendEdits(numToQueue + 1, 1, FAKE_DATA).get(1, TimeUnit.SECONDS); + fail("Did not fail to queue more calls after queue was full"); + } catch (ExecutionException ee) { + if (!(ee.getCause() instanceof LoggerTooFarBehindException)) { + throw ee; + } + } + + delayer.proceed(); + + // After we allow it to proceeed, it should chug through the original queue + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return ch.getQueuedEditsSize() == 0; + } + }, 10, 1000); + } + +} Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java?rev=1363596&view=auto ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java (added) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumCall.java Fri Jul 20 00:25:50 2012 @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.client; + +import static org.junit.Assert.*; + +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.hdfs.qjournal.client.QuorumCall; +import org.junit.Test; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.SettableFuture; + +public class TestQuorumCall { + @Test(timeout=10000) + public void testQuorums() throws Exception { + Map> futures = ImmutableMap.of( + "f1", SettableFuture.create(), + "f2", SettableFuture.create(), + "f3", SettableFuture.create()); + + QuorumCall q = QuorumCall.create(futures); + assertEquals(0, q.countResponses()); + + futures.get("f1").set("first future"); + q.waitFor(1, 0, 0, 100000); // wait for 1 response + q.waitFor(0, 1, 0, 100000); // wait for 1 success + assertEquals(1, q.countResponses()); + + + futures.get("f2").setException(new Exception("error")); + assertEquals(2, q.countResponses()); + + futures.get("f3").set("second future"); + q.waitFor(3, 0, 100, 100000); // wait for 3 responses + q.waitFor(0, 2, 100, 100000); // 2 successes + + assertEquals(3, q.countResponses()); + assertEquals("f1=first future,f3=second future", + Joiner.on(",").withKeyValueSeparator("=").join( + new TreeMap(q.getResults()))); + + try { + q.waitFor(0, 4, 100, 10); + fail("Didn't time out waiting for more responses than came back"); + } catch (TimeoutException te) { + // expected + } + } +} Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java?rev=1363596&view=auto ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java (added) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java Fri Jul 20 00:25:50 2012 @@ -0,0 +1,361 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.List; +import java.util.SortedSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.qjournal.MiniJournalCluster; +import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger; +import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel; +import org.apache.hadoop.hdfs.qjournal.client.QuorumException; +import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager; +import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; +import org.apache.hadoop.hdfs.server.namenode.FileJournalManager; +import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; +import org.apache.hadoop.hdfs.server.namenode.NNStorage; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +/** + * Functional tests for QuorumJournalManager. + * For true unit tests, see {@link TestQuorumJournalManagerUnit}. + */ +public class TestQuorumJournalManager { + private static final Log LOG = LogFactory.getLog( + TestQuorumJournalManager.class); + + private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo( + 12345, "mycluster", "my-bp", 0L, 0); + private static final String JID = "testQuorumJournalManager"; + private MiniJournalCluster cluster; + private Configuration conf; + private QuorumJournalManager qjm; + private List spies; + + @Before + public void setup() throws Exception { + conf = new Configuration(); + cluster = new MiniJournalCluster.Builder(conf) + .build(); + + qjm = createSpyingQJM(); + spies = qjm.getLoggerSetForTests().getLoggersForTests(); + + qjm.recoverUnfinalizedSegments(); + assertEquals(1, qjm.getLoggerSetForTests().getEpoch()); + } + + @After + public void shutdown() throws IOException { + cluster.shutdown(); + } + + @Test + public void testSingleWriter() throws Exception { + writeSegment(qjm, 1, 3, true); + + // Should be finalized + checkRecovery(cluster, 1, 3); + + // Start a new segment + writeSegment(qjm, 4, 1, true); + + // Should be finalized + checkRecovery(cluster, 4, 4); + } + + @Test + public void testOrchestratedFailures() throws Exception { + writeSegment(qjm, 1, 3, true); + writeSegment(qjm, 4, 3, true); + + SortedSet serials = Sets.newTreeSet(); + for (AsyncLogger l : qjm.getLoggerSetForTests().getLoggersForTests()) { + IPCLoggerChannel ch = (IPCLoggerChannel)l; + ch.waitForAllPendingCalls(); + serials.add(ch.getNextIpcSerial()); + } + + // All of the loggers should have sent the same number of RPCs, since there + // were no failures. + assertEquals(1, serials.size()); + + long maxSerial = serials.first(); + LOG.info("Max IPC serial = " + maxSerial); + + cluster.shutdown(); + + cluster = new MiniJournalCluster.Builder(conf) + .build(); + qjm = createSpyingQJM(); + spies = qjm.getLoggerSetForTests().getLoggersForTests(); + + } + + /** + * Test case where a new writer picks up from an old one with no failures + * and the previous unfinalized segment entirely consistent -- i.e. all + * the JournalNodes end at the same transaction ID. + */ + @Test + public void testChangeWritersLogsInSync() throws Exception { + writeSegment(qjm, 1, 3, false); + assertExistsInQuorum(cluster, + NNStorage.getInProgressEditsFileName(1)); + + // Make a new QJM + qjm = new QuorumJournalManager( + conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO); + qjm.recoverUnfinalizedSegments(); + checkRecovery(cluster, 1, 3); + } + + /** + * Test case where a new writer picks up from an old one which crashed + * with the three loggers at different txnids + */ + @Test + public void testChangeWritersLogsOutOfSync1() throws Exception { + // Journal states: [3, 4, 5] + // During recovery: [x, 4, 5] + // Should recovery to txn 5 + doOutOfSyncTest(0, 5L); + } + + @Test + public void testChangeWritersLogsOutOfSync2() throws Exception { + // Journal states: [3, 4, 5] + // During recovery: [3, x, 5] + // Should recovery to txn 5 + doOutOfSyncTest(1, 5L); + } + + @Test + public void testChangeWritersLogsOutOfSync3() throws Exception { + // Journal states: [3, 4, 5] + // During recovery: [3, 4, x] + // Should recovery to txn 4 + doOutOfSyncTest(2, 4L); + } + + + private void doOutOfSyncTest(int missingOnRecoveryIdx, + long expectedRecoveryTxnId) throws Exception { + EditLogOutputStream stm = qjm.startLogSegment(1); + + failLoggerAtTxn(spies.get(0), 4); + failLoggerAtTxn(spies.get(1), 5); + + writeTxns(stm, 1, 3); + + // This should succeed to 2/3 loggers + writeTxns(stm, 4, 1); + + // This should only succeed to 1 logger (index 2). Hence it should + // fail + try { + writeTxns(stm, 5, 1); + fail("Did not fail to write when only a minority succeeded"); + } catch (QuorumException qe) { + GenericTestUtils.assertExceptionContains( + "too many exceptions to achieve quorum size 2/3", + qe); + } + + assertExistsInQuorum(cluster, + NNStorage.getInProgressEditsFileName(1)); + + // Shut down the specified JN, so it's not present during recovery. + cluster.getJournalNode(missingOnRecoveryIdx).stopAndJoin(0); + + // Make a new QJM + qjm = createSpyingQJM(); + + qjm.recoverUnfinalizedSegments(); + checkRecovery(cluster, 1, expectedRecoveryTxnId); + } + + + private void failLoggerAtTxn(AsyncLogger spy, long txid) { + TestQuorumJournalManagerUnit.futureThrows(new IOException("mock failure")) + .when(spy).sendEdits( + Mockito.eq(txid), Mockito.eq(1), Mockito.any()); + } + + /** + * edit lengths [3,4,5] + * first recovery: + * - sees [3,4,x] + * - picks length 4 for recoveryEndTxId + * - calls acceptRecovery() + * - crashes before finalizing + * second recovery: + * - sees [x, 4, 5] + * - should pick recovery length 4, even though it saw + * a larger txid, because a previous recovery accepted it + */ + @Test + public void testRecoverAfterIncompleteRecovery() throws Exception { + EditLogOutputStream stm = qjm.startLogSegment(1); + + failLoggerAtTxn(spies.get(0), 4); + failLoggerAtTxn(spies.get(1), 5); + + writeTxns(stm, 1, 3); + + // This should succeed to 2/3 loggers + writeTxns(stm, 4, 1); + + // This should only succeed to 1 logger (index 2). Hence it should + // fail + try { + writeTxns(stm, 5, 1); + fail("Did not fail to write when only a minority succeeded"); + } catch (QuorumException qe) { + GenericTestUtils.assertExceptionContains( + "too many exceptions to achieve quorum size 2/3", + qe); + } + + // Shut down the logger that has length = 5 + cluster.getJournalNode(2).stopAndJoin(0); + + qjm = createSpyingQJM(); + spies = qjm.getLoggerSetForTests().getLoggersForTests(); + + // Allow no logger to finalize + for (AsyncLogger spy : spies) { + TestQuorumJournalManagerUnit.futureThrows(new IOException("injected")) + .when(spy).finalizeLogSegment(Mockito.eq(1L), + Mockito.eq(4L)); + } + try { + qjm.recoverUnfinalizedSegments(); + fail("Should have failed recovery since no finalization occurred"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains("injected", ioe); + } + + // Now bring back the logger that had 5, and run recovery again. + // We should recover to 4, even though there's a longer log. + cluster.getJournalNode(0).stopAndJoin(0); + cluster.restartJournalNode(2); + + qjm = createSpyingQJM(); + spies = qjm.getLoggerSetForTests().getLoggersForTests(); + qjm.recoverUnfinalizedSegments(); + checkRecovery(cluster, 1, 4); + } + + + private QuorumJournalManager createSpyingQJM() + throws IOException, URISyntaxException { + return new QuorumJournalManager( + conf, cluster.getQuorumJournalURI(JID), FAKE_NSINFO) { + @Override + protected List createLoggers() throws IOException { + LOG.info("===> make spies"); + List realLoggers = super.createLoggers(); + List spies = Lists.newArrayList(); + for (AsyncLogger logger : realLoggers) { + spies.add(Mockito.spy(logger)); + } + return spies; + } + }; + } + + private void writeSegment(QuorumJournalManager qjm, + int startTxId, int numTxns, boolean finalize) throws IOException { + EditLogOutputStream stm = qjm.startLogSegment(startTxId); + // Should create in-progress + assertExistsInQuorum(cluster, + NNStorage.getInProgressEditsFileName(startTxId)); + + writeTxns(stm, startTxId, numTxns); + if (finalize) { + stm.close(); + qjm.finalizeLogSegment(startTxId, startTxId + numTxns - 1); + } + } + + private void writeTxns(EditLogOutputStream stm, int startTxId, int numTxns) + throws IOException { + for (long txid = startTxId; txid < startTxId + numTxns; txid++) { + TestQuorumJournalManagerUnit.writeOp(stm, txid); + } + stm.setReadyToFlush(); + stm.flush(); + } + + private void assertExistsInQuorum(MiniJournalCluster cluster, + String fname) { + int count = 0; + for (int i = 0; i < 3; i++) { + File dir = cluster.getCurrentDir(i, JID); + if (new File(dir, fname).exists()) { + count++; + } + } + assertTrue("File " + fname + " should exist in a quorum of dirs", + count >= cluster.getQuorumSize()); + } + + private void checkRecovery(MiniJournalCluster cluster, + long segmentTxId, long expectedEndTxId) + throws IOException { + int numFinalized = 0; + for (int i = 0; i < cluster.getNumNodes(); i++) { + File logDir = cluster.getCurrentDir(i, JID); + EditLogFile elf = FileJournalManager.getLogFile(logDir, segmentTxId); + if (elf == null) { + continue; + } + if (!elf.isInProgress()) { + numFinalized++; + if (elf.getLastTxId() != expectedEndTxId) { + fail("File " + elf + " finalized to wrong txid, expected " + + expectedEndTxId); + } + } + } + + if (numFinalized < cluster.getQuorumSize()) { + fail("Did not find a quorum of finalized logs starting at " + + segmentTxId); + } + } +} Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java?rev=1363596&view=auto ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java (added) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java Fri Jul 20 00:25:50 2012 @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.client; + +import static org.junit.Assert.fail; +import static org.mockito.Matchers.eq; + +import java.io.IOException; +import java.net.URI; +import java.util.List; + +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.qjournal.client.AsyncLogger; +import org.apache.hadoop.hdfs.qjournal.client.QuorumException; +import org.apache.hadoop.hdfs.qjournal.client.QuorumJournalManager; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; +import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.stubbing.Stubber; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + +/** + * True unit tests for QuorumJournalManager + */ +public class TestQuorumJournalManagerUnit { + static { + ((Log4JLogger)QuorumJournalManager.LOG).getLogger().setLevel(Level.ALL); + } + private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo( + 12345, "mycluster", "my-bp", 0L, 0); + + private Configuration conf = new Configuration(); + private List spyLoggers; + private QuorumJournalManager qjm; + + @Before + public void setup() throws Exception { + spyLoggers = ImmutableList.of( + mockLogger(), + mockLogger(), + mockLogger()); + + qjm = new QuorumJournalManager(conf, new URI("qjournal://host/jid"), FAKE_NSINFO) { + @Override + protected List createLoggers() { + return spyLoggers; + } + }; + + for (AsyncLogger logger : spyLoggers) { + futureReturns(GetJournalStateResponseProto.newBuilder() + .setLastPromisedEpoch(0) + .setHttpPort(-1) + .build()) + .when(logger).getJournalState(); + + futureReturns( + NewEpochResponseProto.newBuilder().build() + ).when(logger).newEpoch(Mockito.anyLong()); + } + + qjm.recoverUnfinalizedSegments(); + } + + private AsyncLogger mockLogger() { + return Mockito.mock(AsyncLogger.class); + } + + static Stubber futureReturns(V value) { + ListenableFuture ret = Futures.immediateFuture(value); + return Mockito.doReturn(ret); + } + + static Stubber futureThrows(Throwable t) { + ListenableFuture ret = Futures.immediateFailedFuture(t); + return Mockito.doReturn(ret); + } + + + @Test + public void testAllLoggersStartOk() throws Exception { + futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong()); + futureReturns(null).when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong()); + futureReturns(null).when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong()); + qjm.startLogSegment(1); + } + + @Test + public void testQuorumOfLoggersStartOk() throws Exception { + futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong()); + futureReturns(null).when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong()); + futureThrows(new IOException("logger failed")) + .when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong()); + qjm.startLogSegment(1); + } + + @Test + public void testQuorumOfLoggersFail() throws Exception { + futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong()); + futureThrows(new IOException("logger failed")) + .when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong()); + futureThrows(new IOException("logger failed")) + .when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong()); + try { + qjm.startLogSegment(1); + fail("Did not throw when quorum failed"); + } catch (QuorumException qe) { + GenericTestUtils.assertExceptionContains("logger failed", qe); + } + } + + @Test + public void testWriteEdits() throws Exception { + EditLogOutputStream stm = createLogSegment(); + writeOp(stm, 1); + writeOp(stm, 2); + + stm.setReadyToFlush(); + writeOp(stm, 3); + + // The flush should log txn 1-2 + futureReturns(null).when(spyLoggers.get(0)).sendEdits( + eq(1L), eq(2), Mockito.any()); + futureReturns(null).when(spyLoggers.get(1)).sendEdits( + eq(1L), eq(2), Mockito.any()); + futureReturns(null).when(spyLoggers.get(2)).sendEdits( + eq(1L), eq(2), Mockito.any()); + stm.flush(); + + // Another flush should now log txn #3 + stm.setReadyToFlush(); + futureReturns(null).when(spyLoggers.get(0)).sendEdits( + eq(3L), eq(1), Mockito.any()); + futureReturns(null).when(spyLoggers.get(1)).sendEdits( + eq(3L), eq(1), Mockito.any()); + futureReturns(null).when(spyLoggers.get(2)).sendEdits( + eq(3L), eq(1), Mockito.any()); + stm.flush(); + } + + @Test + public void testWriteEditsOneSlow() throws Exception { + EditLogOutputStream stm = createLogSegment(); + writeOp(stm, 1); + stm.setReadyToFlush(); + + // Make the first two logs respond immediately + futureReturns(null).when(spyLoggers.get(0)).sendEdits( + eq(1L), eq(1), Mockito.any()); + futureReturns(null).when(spyLoggers.get(1)).sendEdits( + eq(1L), eq(1), Mockito.any()); + + // And the third log not respond + SettableFuture slowLog = SettableFuture.create(); + Mockito.doReturn(slowLog).when(spyLoggers.get(2)).sendEdits( + eq(1L), eq(1), Mockito.any()); + stm.flush(); + } + + private EditLogOutputStream createLogSegment() throws IOException { + futureReturns(null).when(spyLoggers.get(0)).startLogSegment(Mockito.anyLong()); + futureReturns(null).when(spyLoggers.get(1)).startLogSegment(Mockito.anyLong()); + futureReturns(null).when(spyLoggers.get(2)).startLogSegment(Mockito.anyLong()); + EditLogOutputStream stm = qjm.startLogSegment(1); + return stm; + } + + static void writeOp(EditLogOutputStream stm, long txid) throws IOException { + FSEditLogOp op = NameNodeAdapter.createMkdirOp("tx " + txid); + op.setTransactionId(txid); + stm.write(op); + } +} Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java?rev=1363596&view=auto ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java (added) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java Fri Jul 20 00:25:50 2012 @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.server; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.qjournal.QJMTestUtil; +import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProtoOrBuilder; +import org.apache.hadoop.hdfs.qjournal.server.Journal; +import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; +import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.common.StorageErrorReporter; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestJournal { + private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo( + 12345, "mycluster", "my-bp", 0L, 0); + private static final NamespaceInfo FAKE_NSINFO_2 = new NamespaceInfo( + 6789, "mycluster", "my-bp", 0L, 0); + + private static final String JID = "test-journal"; + + private static final File TEST_LOG_DIR = new File( + new File(MiniDFSCluster.getBaseDirectory()), "TestJournal"); + + private StorageErrorReporter mockErrorReporter = Mockito.mock( + StorageErrorReporter.class); + + private Journal journal; + + + @Before + public void setup() throws Exception { + FileUtil.fullyDelete(TEST_LOG_DIR); + journal = new Journal(TEST_LOG_DIR, mockErrorReporter); + } + + @After + public void verifyNoStorageErrors() throws Exception{ + Mockito.verify(mockErrorReporter, Mockito.never()) + .reportErrorOnFile(Mockito.any()); + } + + @Test + public void testEpochHandling() throws Exception { + assertEquals(0, journal.getLastPromisedEpoch()); + NewEpochResponseProto newEpoch = + journal.newEpoch(FAKE_NSINFO, 1); + assertFalse(newEpoch.hasLastSegmentTxId()); + assertEquals(1, journal.getLastPromisedEpoch()); + journal.newEpoch(FAKE_NSINFO, 3); + assertFalse(newEpoch.hasLastSegmentTxId()); + assertEquals(3, journal.getLastPromisedEpoch()); + try { + journal.newEpoch(FAKE_NSINFO, 3); + fail("Should have failed to promise same epoch twice"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains( + "Proposed epoch 3 <= last promise 3", ioe); + } + try { + journal.startLogSegment(new RequestInfo(JID, 1L, 1L), + 12345L); + fail("Should have rejected call from prior epoch"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains( + "epoch 1 is less than the last promised epoch 3", ioe); + } + try { + journal.journal(new RequestInfo(JID, 1L, 1L), + 100L, 0, new byte[0]); + fail("Should have rejected call from prior epoch"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains( + "epoch 1 is less than the last promised epoch 3", ioe); + } + } + + @Test + public void testRestartJournal() throws Exception { + journal.newEpoch(FAKE_NSINFO, 1); + journal.startLogSegment(new RequestInfo("j", 1, 1), 1); + journal.journal(new RequestInfo("j", 1, 2), 1, 2, + QJMTestUtil.createTxnData(1, 2)); + // Don't finalize. + + journal.close(); // close to unlock the storage dir + + // Now re-instantiate, make sure history is still there + journal = new Journal(TEST_LOG_DIR, mockErrorReporter); + assertEquals(1, journal.getLastPromisedEpoch()); + NewEpochResponseProtoOrBuilder newEpoch = journal.newEpoch(FAKE_NSINFO, 2); + assertEquals(1, newEpoch.getLastSegmentTxId()); + } + + @Test + public void testJournalLocking() throws Exception { + StorageDirectory sd = journal.getStorage().getStorageDir(0); + File lockFile = new File(sd.getRoot(), Storage.STORAGE_FILE_LOCK); + + // Journal should not be locked, since we lazily initialize it. + assertFalse(lockFile.exists()); + + journal.newEpoch(FAKE_NSINFO, 1); + Assume.assumeTrue(journal.getStorage().getStorageDir(0).isLockSupported()); + + // Journal should be locked + GenericTestUtils.assertExists(lockFile); + + Journal journal2 = new Journal(TEST_LOG_DIR, mockErrorReporter); + try { + journal2.newEpoch(FAKE_NSINFO, 2); + fail("Did not fail to create another journal in same dir"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains( + "Cannot lock storage", ioe); + } + + journal.close(); + + // Journal should no longer be locked after the close() call. + journal2.newEpoch(FAKE_NSINFO, 2); + } + + @Test + public void testNamespaceVerification() throws Exception { + journal.newEpoch(FAKE_NSINFO, 1); + + try { + journal.newEpoch(FAKE_NSINFO_2, 2); + fail("Did not fail newEpoch() when namespaces mismatched"); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains( + "Incompatible namespaceID", ioe); + } + } + +} Added: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java?rev=1363596&view=auto ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java (added) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java Fri Jul 20 00:25:50 2012 @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.server; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.HttpURLConnection; +import java.net.InetSocketAddress; +import java.net.URL; +import java.util.concurrent.ExecutionException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.qjournal.QJMTestUtil; +import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; +import org.apache.hadoop.hdfs.qjournal.server.Journal; +import org.apache.hadoop.hdfs.qjournal.server.JournalNode; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.base.Charsets; +import com.google.common.primitives.Bytes; +import com.google.common.primitives.Ints; + + +public class TestJournalNode { + private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo( + 12345, "mycluster", "my-bp", 0L, 0); + private static final String JID = "test-journalid"; + + private JournalNode jn; + private Journal journal; + private Configuration conf = new Configuration(); + private IPCLoggerChannel ch; + + static { + // Avoid an error when we double-initialize JvmMetrics + DefaultMetricsSystem.setMiniClusterMode(true); + } + + @Before + public void setup() throws Exception { + conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, + "0.0.0.0:0"); + jn = new JournalNode(); + jn.setConf(conf); + jn.start(); + journal = jn.getOrCreateJournal(JID); + journal.format(FAKE_NSINFO); + + ch = new IPCLoggerChannel(conf, FAKE_NSINFO, JID, jn.getBoundIpcAddress()); + } + + @After + public void teardown() throws Exception { + jn.stop(0); + } + + @Test + public void testJournal() throws Exception { + IPCLoggerChannel ch = new IPCLoggerChannel( + conf, FAKE_NSINFO, JID, jn.getBoundIpcAddress()); + ch.newEpoch(1).get(); + ch.setEpoch(1); + ch.startLogSegment(1).get(); + ch.sendEdits(1, 1, "hello".getBytes(Charsets.UTF_8)).get(); + } + + + @Test + public void testReturnsSegmentInfoAtEpochTransition() throws Exception { + ch.newEpoch(1).get(); + ch.setEpoch(1); + ch.startLogSegment(1).get(); + ch.sendEdits(1, 2, QJMTestUtil.createTxnData(1, 2)).get(); + + // Switch to a new epoch without closing earlier segment + NewEpochResponseProto response = ch.newEpoch(2).get(); + ch.setEpoch(2); + assertEquals(1, response.getLastSegmentTxId()); + + ch.finalizeLogSegment(1, 2).get(); + + // Switch to a new epoch after just closing the earlier segment. + response = ch.newEpoch(3).get(); + ch.setEpoch(3); + assertEquals(1, response.getLastSegmentTxId()); + + // Start a segment but don't write anything, check newEpoch segment info + ch.startLogSegment(3).get(); + response = ch.newEpoch(4).get(); + ch.setEpoch(4); + assertEquals(3, response.getLastSegmentTxId()); + } + + @Test + public void testHttpServer() throws Exception { + InetSocketAddress addr = jn.getBoundHttpAddress(); + assertTrue(addr.getPort() > 0); + + String urlRoot = "http://localhost:" + addr.getPort(); + + // Check default servlets. + String pageContents = DFSTestUtil.urlGet(new URL(urlRoot + "/jmx")); + assertTrue("Bad contents: " + pageContents, + pageContents.contains( + "Hadoop:service=JournalNode,name=JvmMetrics")); + + // Check JSP page. + pageContents = DFSTestUtil.urlGet( + new URL(urlRoot + "/journalstatus.jsp")); + assertTrue(pageContents.contains("JournalNode")); + + // Create some edits on server side + byte[] EDITS_DATA = QJMTestUtil.createTxnData(1, 3); + IPCLoggerChannel ch = new IPCLoggerChannel( + conf, FAKE_NSINFO, JID, jn.getBoundIpcAddress()); + ch.newEpoch(1).get(); + ch.setEpoch(1); + ch.startLogSegment(1).get(); + ch.sendEdits(1, 3, EDITS_DATA).get(); + ch.finalizeLogSegment(1, 3).get(); + + // Attempt to retrieve via HTTP, ensure we get the data back + // including the header we expected + byte[] retrievedViaHttp = DFSTestUtil.urlGetBytes(new URL(urlRoot + + "/getJournal?segmentTxId=1&jid=" + JID)); + byte[] expected = Bytes.concat( + Ints.toByteArray(HdfsConstants.LAYOUT_VERSION), + EDITS_DATA); + + assertArrayEquals(expected, retrievedViaHttp); + + // Attempt to fetch a non-existent file, check that we get an + // error status code + URL badUrl = new URL(urlRoot + "/getJournal?segmentTxId=12345&jid=" + JID); + HttpURLConnection connection = (HttpURLConnection)badUrl.openConnection(); + try { + assertEquals(404, connection.getResponseCode()); + } finally { + connection.disconnect(); + } + } + + /** + * Test that the JournalNode performs correctly as a Paxos + * Acceptor process. + */ + @Test + public void testAcceptRecoveryBehavior() throws Exception { + // We need to run newEpoch() first, or else we have no way to distinguish + // different proposals for the same decision. + try { + ch.prepareRecovery(1L).get(); + fail("Did not throw IllegalState when trying to run paxos without an epoch"); + } catch (ExecutionException ise) { + GenericTestUtils.assertExceptionContains("bad epoch", ise); + } + + ch.newEpoch(1).get(); + ch.setEpoch(1); + + // prepare() with no previously accepted value and no logs present + PrepareRecoveryResponseProto prep = ch.prepareRecovery(1L).get(); + System.err.println("Prep: " + prep); + assertFalse(prep.hasAcceptedInEpoch()); + assertFalse(prep.hasSegmentState()); + + // Make a log segment, and prepare again -- this time should see the + // segment existing. + ch.startLogSegment(1L).get(); + ch.sendEdits(1L, 1, QJMTestUtil.createTxnData(1, 1)).get(); + + prep = ch.prepareRecovery(1L).get(); + System.err.println("Prep: " + prep); + assertFalse(prep.hasAcceptedInEpoch()); + assertTrue(prep.hasSegmentState()); + + // accept() should save the accepted value in persistent storage + // TODO: should be able to accept without a URL here + ch.acceptRecovery(prep.getSegmentState(), new URL("file:///dev/null")).get(); + + // So another prepare() call from a new epoch would return this value + ch.newEpoch(2); + ch.setEpoch(2); + prep = ch.prepareRecovery(1L).get(); + assertEquals(1L, prep.getAcceptedInEpoch()); + assertEquals(1L, prep.getSegmentState().getEndTxId()); + + // A prepare() or accept() call from an earlier epoch should now be rejected + ch.setEpoch(1); + try { + ch.prepareRecovery(1L).get(); + fail("prepare from earlier epoch not rejected"); + } catch (ExecutionException ioe) { + GenericTestUtils.assertExceptionContains( + "epoch 1 is less than the last promised epoch 2", + ioe); + } + try { + ch.acceptRecovery(prep.getSegmentState(), new URL("file:///dev/null")).get(); + fail("accept from earlier epoch not rejected"); + } catch (ExecutionException ioe) { + GenericTestUtils.assertExceptionContains( + "epoch 1 is less than the last promised epoch 2", + ioe); + } + } + + // TODO: + // - add test that checks formatting behavior + // - add test that checks rejects newEpoch if nsinfo doesn't match + +} Modified: hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=1363596&r1=1363595&r2=1363596&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (original) +++ hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java Fri Jul 20 00:25:50 2012 @@ -17,11 +17,16 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.spy; + import java.io.File; import java.io.IOException; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -183,6 +188,15 @@ public class NameNodeAdapter { } } + public static FSEditLogOp createMkdirOp(String path) { + MkdirOp op = MkdirOp.getInstance(new FSEditLogOp.OpInstanceCache()) + .setPath(path) + .setTimestamp(0) + .setPermissionStatus(new PermissionStatus( + "testuser", "testgroup", FsPermission.getDefault())); + return op; + } + /** * @return the number of blocks marked safe by safemode, or -1 * if safemode is not running.