Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7702A11374 for ; Tue, 2 Sep 2014 21:18:04 +0000 (UTC) Received: (qmail 95660 invoked by uid 500); 2 Sep 2014 21:18:04 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 95589 invoked by uid 500); 2 Sep 2014 21:18:04 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 95580 invoked by uid 99); 2 Sep 2014 21:18:04 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Sep 2014 21:18:04 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id AF5D79B094B; Tue, 2 Sep 2014 21:18:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wang@apache.org To: common-commits@hadoop.apache.org Date: Tue, 02 Sep 2014 21:18:03 -0000 Message-Id: <77c44225ea884a72b9ab319742c30432@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] HDFS-6634. inotify in HDFS. Contributed by James Thomas. Repository: hadoop Updated Branches: refs/heads/branch-2 c50371c8b -> 958c9b508 http://git-wip-us.apache.org/repos/asf/hadoop/blob/958c9b50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java new file mode 100644 index 0000000..676f887 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.namenode; + +import com.google.common.collect.Lists; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.inotify.Event; +import org.apache.hadoop.hdfs.protocol.Block; + +import java.util.List; + +/** + * Translates from edit log ops to inotify events. + */ +@InterfaceAudience.Private +public class InotifyFSEditLogOpTranslator { + + private static long getSize(FSEditLogOp.AddCloseOp acOp) { + long size = 0; + for (Block b : acOp.getBlocks()) { + size += b.getNumBytes(); + } + return size; + } + + public static Event[] translate(FSEditLogOp op) { + switch(op.opCode) { + case OP_ADD: + FSEditLogOp.AddOp addOp = (FSEditLogOp.AddOp) op; + if (addOp.blocks.length == 0) { // create + return new Event[] { new Event.CreateEvent.Builder().path(addOp.path) + .ctime(addOp.atime) + .replication(addOp.replication) + .ownerName(addOp.permissions.getUserName()) + .groupName(addOp.permissions.getGroupName()) + .perms(addOp.permissions.getPermission()) + .iNodeType(Event.CreateEvent.INodeType.FILE).build() }; + } else { + return new Event[] { new Event.AppendEvent(addOp.path) }; + } + case OP_CLOSE: + FSEditLogOp.CloseOp cOp = (FSEditLogOp.CloseOp) op; + return new Event[] { + new Event.CloseEvent(cOp.path, getSize(cOp), cOp.mtime) }; + case OP_SET_REPLICATION: + FSEditLogOp.SetReplicationOp setRepOp = (FSEditLogOp.SetReplicationOp) op; + return new Event[] { new Event.MetadataUpdateEvent.Builder() + .metadataType(Event.MetadataUpdateEvent.MetadataType.REPLICATION) + .path(setRepOp.path) + .replication(setRepOp.replication).build() }; + case OP_CONCAT_DELETE: + FSEditLogOp.ConcatDeleteOp cdOp = (FSEditLogOp.ConcatDeleteOp) op; + List events = Lists.newArrayList(); + events.add(new Event.AppendEvent(cdOp.trg)); + for (String src : cdOp.srcs) { + events.add(new Event.UnlinkEvent(src, cdOp.timestamp)); + } + events.add(new Event.CloseEvent(cdOp.trg, -1, cdOp.timestamp)); + return events.toArray(new Event[0]); + case OP_RENAME_OLD: + FSEditLogOp.RenameOldOp rnOpOld = (FSEditLogOp.RenameOldOp) op; + return new Event[] { + new Event.RenameEvent(rnOpOld.src, rnOpOld.dst, rnOpOld.timestamp) }; + case OP_RENAME: + FSEditLogOp.RenameOp rnOp = (FSEditLogOp.RenameOp) op; + return new Event[] { + new Event.RenameEvent(rnOp.src, rnOp.dst, rnOp.timestamp) }; + case OP_DELETE: + FSEditLogOp.DeleteOp delOp = (FSEditLogOp.DeleteOp) op; + return new Event[] { new Event.UnlinkEvent(delOp.path, delOp.timestamp) }; + case OP_MKDIR: + FSEditLogOp.MkdirOp mkOp = (FSEditLogOp.MkdirOp) op; + return new Event[] { new Event.CreateEvent.Builder().path(mkOp.path) + .ctime(mkOp.timestamp) + .ownerName(mkOp.permissions.getUserName()) + .groupName(mkOp.permissions.getGroupName()) + .perms(mkOp.permissions.getPermission()) + .iNodeType(Event.CreateEvent.INodeType.DIRECTORY).build() }; + case OP_SET_PERMISSIONS: + FSEditLogOp.SetPermissionsOp permOp = (FSEditLogOp.SetPermissionsOp) op; + return new Event[] { new Event.MetadataUpdateEvent.Builder() + .metadataType(Event.MetadataUpdateEvent.MetadataType.PERMS) + .path(permOp.src) + .perms(permOp.permissions).build() }; + case OP_SET_OWNER: + FSEditLogOp.SetOwnerOp ownOp = (FSEditLogOp.SetOwnerOp) op; + return new Event[] { new Event.MetadataUpdateEvent.Builder() + .metadataType(Event.MetadataUpdateEvent.MetadataType.OWNER) + .path(ownOp.src) + .ownerName(ownOp.username).groupName(ownOp.groupname).build() }; + case OP_TIMES: + FSEditLogOp.TimesOp timesOp = (FSEditLogOp.TimesOp) op; + return new Event[] { new Event.MetadataUpdateEvent.Builder() + .metadataType(Event.MetadataUpdateEvent.MetadataType.TIMES) + .path(timesOp.path) + .atime(timesOp.atime).mtime(timesOp.mtime).build() }; + case OP_SYMLINK: + FSEditLogOp.SymlinkOp symOp = (FSEditLogOp.SymlinkOp) op; + return new Event[] { new Event.CreateEvent.Builder().path(symOp.path) + .ctime(symOp.atime) + .ownerName(symOp.permissionStatus.getUserName()) + .groupName(symOp.permissionStatus.getGroupName()) + .perms(symOp.permissionStatus.getPermission()) + .symlinkTarget(symOp.value) + .iNodeType(Event.CreateEvent.INodeType.SYMLINK).build() }; + case OP_REMOVE_XATTR: + FSEditLogOp.RemoveXAttrOp rxOp = (FSEditLogOp.RemoveXAttrOp) op; + return new Event[] { new Event.MetadataUpdateEvent.Builder() + .metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS) + .path(rxOp.src) + .xAttrs(rxOp.xAttrs) + .xAttrsRemoved(true).build() }; + case OP_SET_XATTR: + FSEditLogOp.SetXAttrOp sxOp = (FSEditLogOp.SetXAttrOp) op; + return new Event[] { new Event.MetadataUpdateEvent.Builder() + .metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS) + .path(sxOp.src) + .xAttrs(sxOp.xAttrs) + .xAttrsRemoved(false).build() }; + case OP_SET_ACL: + FSEditLogOp.SetAclOp saOp = (FSEditLogOp.SetAclOp) op; + return new Event[] { new Event.MetadataUpdateEvent.Builder() + .metadataType(Event.MetadataUpdateEvent.MetadataType.ACLS) + .path(saOp.src) + .acls(saOp.aclEntries).build() }; + default: + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/958c9b50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java index 0a7572e..61a1c5f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java @@ -57,6 +57,17 @@ import com.google.common.collect.Sets; public class JournalSet implements JournalManager { static final Log LOG = LogFactory.getLog(FSEditLog.class); + + private static final Comparator + LOCAL_LOG_PREFERENCE_COMPARATOR = new Comparator() { + @Override + public int compare(EditLogInputStream elis1, EditLogInputStream elis2) { + // we want local logs to be ordered earlier in the collection, and true + // is considered larger than false, so we want to invert the booleans here + return ComparisonChain.start().compare(!elis1.isLocalLog(), + !elis2.isLocalLog()).result(); + } + }; static final public Comparator EDIT_LOG_INPUT_STREAM_COMPARATOR = new Comparator() { @@ -181,6 +192,8 @@ public class JournalSet implements JournalManager { private final List journals = new CopyOnWriteArrayList(); final int minimumRedundantJournals; + + private boolean closed; JournalSet(int minimumRedundantResources) { this.minimumRedundantJournals = minimumRedundantResources; @@ -234,6 +247,11 @@ public class JournalSet implements JournalManager { jas.close(); } }, "close journal"); + closed = true; + } + + public boolean isOpen() { + return !closed; } /** @@ -282,10 +300,25 @@ public class JournalSet implements JournalManager { if (acc.isEmpty()) { acc.add(elis); } else { - long accFirstTxId = acc.get(0).getFirstTxId(); + EditLogInputStream accFirst = acc.get(0); + long accFirstTxId = accFirst.getFirstTxId(); if (accFirstTxId == elis.getFirstTxId()) { - acc.add(elis); + // if we have a finalized log segment available at this txid, + // we should throw out all in-progress segments at this txid + if (elis.isInProgress()) { + if (accFirst.isInProgress()) { + acc.add(elis); + } + } else { + if (accFirst.isInProgress()) { + acc.clear(); + } + acc.add(elis); + } } else if (accFirstTxId < elis.getFirstTxId()) { + // try to read from the local logs first since the throughput should + // be higher + Collections.sort(acc, LOCAL_LOG_PREFERENCE_COMPARATOR); outStreams.add(new RedundantEditLogInputStream(acc, fromTxId)); acc.clear(); acc.add(elis); @@ -297,6 +330,7 @@ public class JournalSet implements JournalManager { } } if (!acc.isEmpty()) { + Collections.sort(acc, LOCAL_LOG_PREFERENCE_COMPARATOR); outStreams.add(new RedundantEditLogInputStream(acc, fromTxId)); acc.clear(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/958c9b50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 1a3d107..f451d1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -34,6 +34,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; @@ -66,6 +67,8 @@ import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HDFSPolicyProvider; +import org.apache.hadoop.hdfs.inotify.Event; +import org.apache.hadoop.hdfs.inotify.EventsList; import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; @@ -1466,5 +1469,116 @@ class NameNodeRpcServer implements NamenodeProtocols { public void checkAccess(String path, FsAction mode) throws IOException { namesystem.checkAccess(path, mode); } + + @Override // ClientProtocol + public long getCurrentEditLogTxid() throws IOException { + namesystem.checkOperation(OperationCategory.READ); // only active + namesystem.checkSuperuserPrivilege(); + // if it's not yet open for write, we may be in the process of transitioning + // from standby to active and may not yet know what the latest committed + // txid is + return namesystem.getEditLog().isOpenForWrite() ? + namesystem.getEditLog().getLastWrittenTxId() : -1; + } + + private static FSEditLogOp readOp(EditLogInputStream elis) + throws IOException { + try { + return elis.readOp(); + // we can get the below two exceptions if a segment is deleted + // (because we have accumulated too many edits) or (for the local journal/ + // no-QJM case only) if a in-progress segment is finalized under us ... + // no need to throw an exception back to the client in this case + } catch (FileNotFoundException e) { + LOG.debug("Tried to read from deleted or moved edit log segment", e); + return null; + } catch (TransferFsImage.HttpGetFailedException e) { + LOG.debug("Tried to read from deleted edit log segment", e); + return null; + } + } + + @Override // ClientProtocol + public EventsList getEditsFromTxid(long txid) throws IOException { + namesystem.checkOperation(OperationCategory.READ); // only active + namesystem.checkSuperuserPrivilege(); + int maxEventsPerRPC = nn.conf.getInt( + DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_KEY, + DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT); + FSEditLog log = namesystem.getFSImage().getEditLog(); + long syncTxid = log.getSyncTxId(); + // If we haven't synced anything yet, we can only read finalized + // segments since we can't reliably determine which txns in in-progress + // segments have actually been committed (e.g. written to a quorum of JNs). + // If we have synced txns, we can definitely read up to syncTxid since + // syncTxid is only updated after a transaction is committed to all + // journals. (In-progress segments written by old writers are already + // discarded for us, so if we read any in-progress segments they are + // guaranteed to have been written by this NameNode.) + boolean readInProgress = syncTxid > 0; + + List events = Lists.newArrayList(); + long maxSeenTxid = -1; + long firstSeenTxid = -1; + + if (syncTxid > 0 && txid > syncTxid) { + // we can't read past syncTxid, so there's no point in going any further + return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid); + } + + Collection streams = null; + try { + streams = log.selectInputStreams(txid, 0, null, readInProgress); + } catch (IllegalStateException e) { // can happen if we have + // transitioned out of active and haven't yet transitioned to standby + // and are using QJM -- the edit log will be closed and this exception + // will result + LOG.info("NN is transitioning from active to standby and FSEditLog " + + "is closed -- could not read edits"); + return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid); + } + + boolean breakOuter = false; + for (EditLogInputStream elis : streams) { + // our assumption in this code is the EditLogInputStreams are ordered by + // starting txid + try { + FSEditLogOp op = null; + while ((op = readOp(elis)) != null) { + // break out of here in the unlikely event that syncTxid is so + // out of date that its segment has already been deleted, so the first + // txid we get is greater than syncTxid + if (syncTxid > 0 && op.getTransactionId() > syncTxid) { + breakOuter = true; + break; + } + + Event[] eventsFromOp = InotifyFSEditLogOpTranslator.translate(op); + if (eventsFromOp != null) { + events.addAll(Arrays.asList(eventsFromOp)); + } + if (op.getTransactionId() > maxSeenTxid) { + maxSeenTxid = op.getTransactionId(); + } + if (firstSeenTxid == -1) { + firstSeenTxid = op.getTransactionId(); + } + if (events.size() >= maxEventsPerRPC || (syncTxid > 0 && + op.getTransactionId() == syncTxid)) { + // we're done + breakOuter = true; + break; + } + } + } finally { + elis.close(); + } + if (breakOuter) { + break; + } + } + + return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/958c9b50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java index 7c642c0..674a957 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java @@ -279,4 +279,9 @@ class RedundantEditLogInputStream extends EditLogInputStream { elis.setMaxOpSize(maxOpSize); } } + + @Override + public boolean isLocalLog() { + return streams[curIdx].isLocalLog(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/958c9b50/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java index 242c7d7..160371a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java @@ -63,7 +63,7 @@ import org.apache.http.client.utils.URIBuilder; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; - +import org.mortbay.jetty.EofException; /** * This class provides fetching a specified file from the NameNode. @@ -370,6 +370,9 @@ public class TransferFsImage { throttler.throttle(num, canceler); } } + } catch (EofException e) { + LOG.info("Connection closed by client"); + out = null; // so we don't close in the finally } finally { if (out != null) { out.close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/958c9b50/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto index edffc9a..f1673ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto @@ -33,6 +33,7 @@ import "hdfs.proto"; import "acl.proto"; import "xattr.proto"; import "encryption.proto"; +import "inotify.proto"; /** * The ClientNamenodeProtocol Service defines the interface between a client @@ -664,6 +665,21 @@ message CheckAccessRequestProto { message CheckAccessResponseProto { // void response } +message GetCurrentEditLogTxidRequestProto { +} + +message GetCurrentEditLogTxidResponseProto { + required int64 txid = 1; +} + +message GetEditsFromTxidRequestProto { + required int64 txid = 1; +} + +message GetEditsFromTxidResponseProto { + required EventsListProto eventsList = 1; +} + service ClientNamenodeProtocol { rpc getBlockLocations(GetBlockLocationsRequestProto) returns(GetBlockLocationsResponseProto); @@ -801,4 +817,8 @@ service ClientNamenodeProtocol { returns(ListEncryptionZonesResponseProto); rpc getEZForPath(GetEZForPathRequestProto) returns(GetEZForPathResponseProto); + rpc getCurrentEditLogTxid(GetCurrentEditLogTxidRequestProto) + returns(GetCurrentEditLogTxidResponseProto); + rpc getEditsFromTxid(GetEditsFromTxidRequestProto) + returns(GetEditsFromTxidResponseProto); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/958c9b50/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto new file mode 100644 index 0000000..b58bfcc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * These .proto interfaces are private and stable. + * Please see http://wiki.apache.org/hadoop/Compatibility + * for what changes are allowed for a *stable* .proto interface. + */ + +// This file contains protocol buffers used to communicate edits to clients +// as part of the inotify system. + +option java_package = "org.apache.hadoop.hdfs.protocol.proto"; +option java_outer_classname = "InotifyProtos"; +option java_generate_equals_and_hash = true; +package hadoop.hdfs; + +import "acl.proto"; +import "xattr.proto"; +import "hdfs.proto"; + +enum EventType { + EVENT_CREATE = 0x0; + EVENT_CLOSE = 0x1; + EVENT_APPEND = 0x2; + EVENT_RENAME = 0x3; + EVENT_METADATA = 0x4; + EVENT_UNLINK = 0x5; +} + +message EventProto { + required EventType type = 1; + required bytes contents = 2; +} + +enum INodeType { + I_TYPE_FILE = 0x0; + I_TYPE_DIRECTORY = 0x1; + I_TYPE_SYMLINK = 0x2; +} + +enum MetadataUpdateType { + META_TYPE_TIMES = 0x0; + META_TYPE_REPLICATION = 0x1; + META_TYPE_OWNER = 0x2; + META_TYPE_PERMS = 0x3; + META_TYPE_ACLS = 0x4; + META_TYPE_XATTRS = 0x5; +} + +message CreateEventProto { + required INodeType type = 1; + required string path = 2; + required int64 ctime = 3; + required string ownerName = 4; + required string groupName = 5; + required FsPermissionProto perms = 6; + optional int32 replication = 7; + optional string symlinkTarget = 8; +} + +message CloseEventProto { + required string path = 1; + required int64 fileSize = 2; + required int64 timestamp = 3; +} + +message AppendEventProto { + required string path = 1; +} + +message RenameEventProto { + required string srcPath = 1; + required string destPath = 2; + required int64 timestamp = 3; +} + +message MetadataUpdateEventProto { + required string path = 1; + required MetadataUpdateType type = 2; + optional int64 mtime = 3; + optional int64 atime = 4; + optional int32 replication = 5; + optional string ownerName = 6; + optional string groupName = 7; + optional FsPermissionProto perms = 8; + repeated AclEntryProto acls = 9; + repeated XAttrProto xAttrs = 10; + optional bool xAttrsRemoved = 11; +} + +message UnlinkEventProto { + required string path = 1; + required int64 timestamp = 2; +} + +message EventsListProto { + repeated EventProto events = 1; + required int64 firstTxid = 2; + required int64 lastTxid = 3; + required int64 syncTxid = 4; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/958c9b50/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 4b843f0..1cebdd7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2058,4 +2058,14 @@ + + dfs.namenode.inotify.max.events.per.rpc + 1000 + Maximum number of events that will be sent to an inotify client + in a single RPC response. The default value attempts to amortize away + the overhead for this RPC while avoiding huge memory requirements for the + client and NameNode (1000 events should consume no more than 1 MB.) + + + http://git-wip-us.apache.org/repos/asf/hadoop/blob/958c9b50/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java new file mode 100644 index 0000000..c268281 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java @@ -0,0 +1,430 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.XAttrSetFlag; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.inotify.Event; +import org.apache.hadoop.hdfs.inotify.MissingEventsException; +import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes; +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; +import org.apache.hadoop.util.ExitUtil; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.URISyntaxException; +import java.util.EnumSet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class TestDFSInotifyEventInputStream { + + private static final int BLOCK_SIZE = 1024; + private static final Log LOG = LogFactory.getLog( + TestDFSInotifyEventInputStream.class); + + private static Event waitForNextEvent(DFSInotifyEventInputStream eis) + throws IOException, MissingEventsException { + Event next = null; + while ((next = eis.poll()) == null); + return next; + } + + /** + * If this test fails, check whether the newly added op should map to an + * inotify event, and if so, establish the mapping in + * {@link org.apache.hadoop.hdfs.server.namenode.InotifyFSEditLogOpTranslator} + * and update testBasic() to include the new op. + */ + @Test + public void testOpcodeCount() { + Assert.assertTrue(FSEditLogOpCodes.values().length == 46); + } + + + /** + * Tests all FsEditLogOps that are converted to inotify events. + */ + @Test(timeout = 120000) + @SuppressWarnings("deprecation") + public void testBasic() throws IOException, URISyntaxException, + InterruptedException, MissingEventsException { + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true); + // so that we can get an atime change + conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 1); + + MiniQJMHACluster.Builder builder = new MiniQJMHACluster.Builder(conf); + builder.getDfsBuilder().numDataNodes(2); + MiniQJMHACluster cluster = builder.build(); + + try { + cluster.getDfsCluster().waitActive(); + cluster.getDfsCluster().transitionToActive(0); + DFSClient client = new DFSClient(cluster.getDfsCluster().getNameNode(0) + .getNameNodeAddress(), conf); + FileSystem fs = cluster.getDfsCluster().getFileSystem(0); + DFSTestUtil.createFile(fs, new Path("/file"), BLOCK_SIZE, (short) 1, 0L); + DFSTestUtil.createFile(fs, new Path("/file3"), BLOCK_SIZE, (short) 1, 0L); + DFSTestUtil.createFile(fs, new Path("/file5"), BLOCK_SIZE, (short) 1, 0L); + DFSInotifyEventInputStream eis = client.getInotifyEventStream(); + client.rename("/file", "/file4", null); // RenameOp -> RenameEvent + client.rename("/file4", "/file2"); // RenameOldOp -> RenameEvent + // DeleteOp, AddOp -> UnlinkEvent, CreateEvent + OutputStream os = client.create("/file2", true, (short) 2, BLOCK_SIZE); + os.write(new byte[BLOCK_SIZE]); + os.close(); // CloseOp -> CloseEvent + // AddOp -> AppendEvent + os = client.append("/file2", BLOCK_SIZE, null, null); + os.write(new byte[BLOCK_SIZE]); + os.close(); // CloseOp -> CloseEvent + Thread.sleep(10); // so that the atime will get updated on the next line + client.open("/file2").read(new byte[1]); // TimesOp -> MetadataUpdateEvent + // SetReplicationOp -> MetadataUpdateEvent + client.setReplication("/file2", (short) 1); + // ConcatDeleteOp -> AppendEvent, UnlinkEvent, CloseEvent + client.concat("/file2", new String[]{"/file3"}); + client.delete("/file2", false); // DeleteOp -> UnlinkEvent + client.mkdirs("/dir", null, false); // MkdirOp -> CreateEvent + // SetPermissionsOp -> MetadataUpdateEvent + client.setPermission("/dir", FsPermission.valueOf("-rw-rw-rw-")); + // SetOwnerOp -> MetadataUpdateEvent + client.setOwner("/dir", "username", "groupname"); + client.createSymlink("/dir", "/dir2", false); // SymlinkOp -> CreateEvent + client.setXAttr("/file5", "user.field", "value".getBytes(), EnumSet.of( + XAttrSetFlag.CREATE)); // SetXAttrOp -> MetadataUpdateEvent + // RemoveXAttrOp -> MetadataUpdateEvent + client.removeXAttr("/file5", "user.field"); + // SetAclOp -> MetadataUpdateEvent + client.setAcl("/file5", AclEntry.parseAclSpec( + "user::rwx,user:foo:rw-,group::r--,other::---", true)); + client.removeAcl("/file5"); // SetAclOp -> MetadataUpdateEvent + + Event next = null; + + // RenameOp + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.RENAME); + Event.RenameEvent re = (Event.RenameEvent) next; + Assert.assertTrue(re.getDstPath().equals("/file4")); + Assert.assertTrue(re.getSrcPath().equals("/file")); + Assert.assertTrue(re.getTimestamp() > 0); + + long eventsBehind = eis.getEventsBehindEstimate(); + + // RenameOldOp + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.RENAME); + Event.RenameEvent re2 = (Event.RenameEvent) next; + Assert.assertTrue(re2.getDstPath().equals("/file2")); + Assert.assertTrue(re2.getSrcPath().equals("/file4")); + Assert.assertTrue(re.getTimestamp() > 0); + + // DeleteOp + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK); + Assert.assertTrue(((Event.UnlinkEvent) next).getPath().equals("/file2")); + + // AddOp + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); + Event.CreateEvent ce = (Event.CreateEvent) next; + Assert.assertTrue(ce.getiNodeType() == Event.CreateEvent.INodeType.FILE); + Assert.assertTrue(ce.getPath().equals("/file2")); + Assert.assertTrue(ce.getCtime() > 0); + Assert.assertTrue(ce.getReplication() > 0); + Assert.assertTrue(ce.getSymlinkTarget() == null); + + // CloseOp + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE); + Event.CloseEvent ce2 = (Event.CloseEvent) next; + Assert.assertTrue(ce2.getPath().equals("/file2")); + Assert.assertTrue(ce2.getFileSize() > 0); + Assert.assertTrue(ce2.getTimestamp() > 0); + + // AddOp + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.APPEND); + Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2")); + + // CloseOp + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE); + Assert.assertTrue(((Event.CloseEvent) next).getPath().equals("/file2")); + + // TimesOp + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue = (Event.MetadataUpdateEvent) next; + Assert.assertTrue(mue.getPath().equals("/file2")); + Assert.assertTrue(mue.getMetadataType() == + Event.MetadataUpdateEvent.MetadataType.TIMES); + + // SetReplicationOp + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue2 = (Event.MetadataUpdateEvent) next; + Assert.assertTrue(mue2.getPath().equals("/file2")); + Assert.assertTrue(mue2.getMetadataType() == + Event.MetadataUpdateEvent.MetadataType.REPLICATION); + Assert.assertTrue(mue2.getReplication() == 1); + + // ConcatDeleteOp + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.APPEND); + Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2")); + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK); + Event.UnlinkEvent ue2 = (Event.UnlinkEvent) next; + Assert.assertTrue(ue2.getPath().equals("/file3")); + Assert.assertTrue(ue2.getTimestamp() > 0); + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE); + Event.CloseEvent ce3 = (Event.CloseEvent) next; + Assert.assertTrue(ce3.getPath().equals("/file2")); + Assert.assertTrue(ce3.getTimestamp() > 0); + + // DeleteOp + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK); + Event.UnlinkEvent ue = (Event.UnlinkEvent) next; + Assert.assertTrue(ue.getPath().equals("/file2")); + Assert.assertTrue(ue.getTimestamp() > 0); + + // MkdirOp + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); + Event.CreateEvent ce4 = (Event.CreateEvent) next; + Assert.assertTrue(ce4.getiNodeType() == + Event.CreateEvent.INodeType.DIRECTORY); + Assert.assertTrue(ce4.getPath().equals("/dir")); + Assert.assertTrue(ce4.getCtime() > 0); + Assert.assertTrue(ce4.getReplication() == 0); + Assert.assertTrue(ce4.getSymlinkTarget() == null); + + // SetPermissionsOp + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue3 = (Event.MetadataUpdateEvent) next; + Assert.assertTrue(mue3.getPath().equals("/dir")); + Assert.assertTrue(mue3.getMetadataType() == + Event.MetadataUpdateEvent.MetadataType.PERMS); + Assert.assertTrue(mue3.getPerms().toString().contains("rw-rw-rw-")); + + // SetOwnerOp + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue4 = (Event.MetadataUpdateEvent) next; + Assert.assertTrue(mue4.getPath().equals("/dir")); + Assert.assertTrue(mue4.getMetadataType() == + Event.MetadataUpdateEvent.MetadataType.OWNER); + Assert.assertTrue(mue4.getOwnerName().equals("username")); + Assert.assertTrue(mue4.getGroupName().equals("groupname")); + + // SymlinkOp + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); + Event.CreateEvent ce5 = (Event.CreateEvent) next; + Assert.assertTrue(ce5.getiNodeType() == + Event.CreateEvent.INodeType.SYMLINK); + Assert.assertTrue(ce5.getPath().equals("/dir2")); + Assert.assertTrue(ce5.getCtime() > 0); + Assert.assertTrue(ce5.getReplication() == 0); + Assert.assertTrue(ce5.getSymlinkTarget().equals("/dir")); + + // SetXAttrOp + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue5 = (Event.MetadataUpdateEvent) next; + Assert.assertTrue(mue5.getPath().equals("/file5")); + Assert.assertTrue(mue5.getMetadataType() == + Event.MetadataUpdateEvent.MetadataType.XATTRS); + Assert.assertTrue(mue5.getxAttrs().size() == 1); + Assert.assertTrue(mue5.getxAttrs().get(0).getName().contains("field")); + Assert.assertTrue(!mue5.isxAttrsRemoved()); + + // RemoveXAttrOp + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue6 = (Event.MetadataUpdateEvent) next; + Assert.assertTrue(mue6.getPath().equals("/file5")); + Assert.assertTrue(mue6.getMetadataType() == + Event.MetadataUpdateEvent.MetadataType.XATTRS); + Assert.assertTrue(mue6.getxAttrs().size() == 1); + Assert.assertTrue(mue6.getxAttrs().get(0).getName().contains("field")); + Assert.assertTrue(mue6.isxAttrsRemoved()); + + // SetAclOp (1) + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue7 = (Event.MetadataUpdateEvent) next; + Assert.assertTrue(mue7.getPath().equals("/file5")); + Assert.assertTrue(mue7.getMetadataType() == + Event.MetadataUpdateEvent.MetadataType.ACLS); + Assert.assertTrue(mue7.getAcls().contains( + AclEntry.parseAclEntry("user::rwx", true))); + + // SetAclOp (2) + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue8 = (Event.MetadataUpdateEvent) next; + Assert.assertTrue(mue8.getPath().equals("/file5")); + Assert.assertTrue(mue8.getMetadataType() == + Event.MetadataUpdateEvent.MetadataType.ACLS); + Assert.assertTrue(mue8.getAcls() == null); + + // Returns null when there are no further events + Assert.assertTrue(eis.poll() == null); + + // make sure the estimate hasn't changed since the above assertion + // tells us that we are fully caught up to the current namesystem state + // and we should not have been behind at all when eventsBehind was set + // either, since there were few enough events that they should have all + // been read to the client during the first poll() call + Assert.assertTrue(eis.getEventsBehindEstimate() == eventsBehind); + + } finally { + cluster.shutdown(); + } + } + + @Test(timeout = 120000) + public void testNNFailover() throws IOException, URISyntaxException, + MissingEventsException { + Configuration conf = new HdfsConfiguration(); + MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build(); + + try { + cluster.getDfsCluster().waitActive(); + cluster.getDfsCluster().transitionToActive(0); + DFSClient client = ((DistributedFileSystem) HATestUtil.configureFailoverFs + (cluster.getDfsCluster(), conf)).dfs; + DFSInotifyEventInputStream eis = client.getInotifyEventStream(); + for (int i = 0; i < 10; i++) { + client.mkdirs("/dir" + i, null, false); + } + cluster.getDfsCluster().shutdownNameNode(0); + cluster.getDfsCluster().transitionToActive(1); + Event next = null; + // we can read all of the edits logged by the old active from the new + // active + for (int i = 0; i < 10; i++) { + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); + Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" + + i)); + } + Assert.assertTrue(eis.poll() == null); + } finally { + cluster.shutdown(); + } + } + + @Test(timeout = 120000) + public void testTwoActiveNNs() throws IOException, MissingEventsException { + Configuration conf = new HdfsConfiguration(); + MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build(); + + try { + cluster.getDfsCluster().waitActive(); + cluster.getDfsCluster().transitionToActive(0); + DFSClient client0 = new DFSClient(cluster.getDfsCluster().getNameNode(0) + .getNameNodeAddress(), conf); + DFSClient client1 = new DFSClient(cluster.getDfsCluster().getNameNode(1) + .getNameNodeAddress(), conf); + DFSInotifyEventInputStream eis = client0.getInotifyEventStream(); + for (int i = 0; i < 10; i++) { + client0.mkdirs("/dir" + i, null, false); + } + + cluster.getDfsCluster().transitionToActive(1); + for (int i = 10; i < 20; i++) { + client1.mkdirs("/dir" + i, null, false); + } + + // make sure that the old active can't read any further than the edits + // it logged itself (it has no idea whether the in-progress edits from + // the other writer have actually been committed) + Event next = null; + for (int i = 0; i < 10; i++) { + next = waitForNextEvent(eis); + Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); + Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" + + i)); + } + Assert.assertTrue(eis.poll() == null); + } finally { + try { + cluster.shutdown(); + } catch (ExitUtil.ExitException e) { + // expected because the old active will be unable to flush the + // end-of-segment op since it is fenced + } + } + } + + @Test(timeout = 120000) + public void testReadEventsWithTimeout() throws IOException, + InterruptedException, MissingEventsException { + Configuration conf = new HdfsConfiguration(); + MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build(); + + try { + cluster.getDfsCluster().waitActive(); + cluster.getDfsCluster().transitionToActive(0); + final DFSClient client = new DFSClient(cluster.getDfsCluster() + .getNameNode(0).getNameNodeAddress(), conf); + DFSInotifyEventInputStream eis = client.getInotifyEventStream(); + ScheduledExecutorService ex = Executors + .newSingleThreadScheduledExecutor(); + ex.schedule(new Runnable() { + @Override + public void run() { + try { + client.mkdirs("/dir", null, false); + } catch (IOException e) { + // test will fail + LOG.error("Unable to create /dir", e); + } + } + }, 1, TimeUnit.SECONDS); + // a very generous wait period -- the edit will definitely have been + // processed by the time this is up + Event next = eis.poll(5, TimeUnit.SECONDS); + Assert.assertTrue(next != null); + Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); + Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir")); + } finally { + cluster.shutdown(); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/958c9b50/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java index 3166ccc..9380701 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java @@ -56,7 +56,9 @@ public class MiniQJMHACluster { public Builder(Configuration conf) { this.conf = conf; - this.dfsBuilder = new MiniDFSCluster.Builder(conf); + // most QJMHACluster tests don't need DataNodes, so we'll make + // this the default + this.dfsBuilder = new MiniDFSCluster.Builder(conf).numDataNodes(0); } public MiniDFSCluster.Builder getDfsBuilder() { @@ -102,7 +104,7 @@ public class MiniQJMHACluster { cluster = builder.dfsBuilder.nnTopology(topology) .manageNameDfsSharedDirs(false).build(); cluster.waitActive(); - cluster.shutdown(); + cluster.shutdownNameNodes(); // initialize the journal nodes Configuration confNN0 = cluster.getConfiguration(0); http://git-wip-us.apache.org/repos/asf/hadoop/blob/958c9b50/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java index 4783e8f..2e38d5f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java @@ -382,7 +382,7 @@ public class TestQJMWithFaults { } @Override - protected ExecutorService createExecutor() { + protected ExecutorService createSingleThreadExecutor() { return MoreExecutors.sameThreadExecutor(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/958c9b50/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java index fcb8e55..8bb39f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java @@ -939,7 +939,7 @@ public class TestQuorumJournalManager { public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, String journalId, InetSocketAddress addr) { AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId, addr) { - protected ExecutorService createExecutor() { + protected ExecutorService createSingleThreadExecutor() { // Don't parallelize calls to the quorum in the tests. // This makes the tests more deterministic. return MoreExecutors.sameThreadExecutor(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/958c9b50/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java index 1c698c7..d2846f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java @@ -916,6 +916,10 @@ public class TestEditLog { public void setMaxOpSize(int maxOpSize) { reader.setMaxOpSize(maxOpSize); } + + @Override public boolean isLocalLog() { + return true; + } } @Test