hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [08/14] HDFS-6634. inotify in HDFS. Contributed by James Thomas.
Date Tue, 02 Sep 2014 23:26:43 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
new file mode 100644
index 0000000..676f887
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.inotify.Event;
+import org.apache.hadoop.hdfs.protocol.Block;
+
+import java.util.List;
+
+/**
+ * Translates from edit log ops to inotify events.
+ */
+@InterfaceAudience.Private
+public class InotifyFSEditLogOpTranslator {
+
+  private static long getSize(FSEditLogOp.AddCloseOp acOp) {
+    long size = 0;
+    for (Block b : acOp.getBlocks()) {
+      size += b.getNumBytes();
+    }
+    return size;
+  }
+
+  public static Event[] translate(FSEditLogOp op) {
+    switch(op.opCode) {
+    case OP_ADD:
+      FSEditLogOp.AddOp addOp = (FSEditLogOp.AddOp) op;
+      if (addOp.blocks.length == 0) { // create
+        return new Event[] { new Event.CreateEvent.Builder().path(addOp.path)
+            .ctime(addOp.atime)
+            .replication(addOp.replication)
+            .ownerName(addOp.permissions.getUserName())
+            .groupName(addOp.permissions.getGroupName())
+            .perms(addOp.permissions.getPermission())
+            .iNodeType(Event.CreateEvent.INodeType.FILE).build() };
+      } else {
+        return new Event[] { new Event.AppendEvent(addOp.path) };
+      }
+    case OP_CLOSE:
+      FSEditLogOp.CloseOp cOp = (FSEditLogOp.CloseOp) op;
+      return new Event[] {
+          new Event.CloseEvent(cOp.path, getSize(cOp), cOp.mtime) };
+    case OP_SET_REPLICATION:
+      FSEditLogOp.SetReplicationOp setRepOp = (FSEditLogOp.SetReplicationOp) op;
+      return new Event[] { new Event.MetadataUpdateEvent.Builder()
+          .metadataType(Event.MetadataUpdateEvent.MetadataType.REPLICATION)
+          .path(setRepOp.path)
+          .replication(setRepOp.replication).build() };
+    case OP_CONCAT_DELETE:
+      FSEditLogOp.ConcatDeleteOp cdOp = (FSEditLogOp.ConcatDeleteOp) op;
+      List<Event> events = Lists.newArrayList();
+      events.add(new Event.AppendEvent(cdOp.trg));
+      for (String src : cdOp.srcs) {
+        events.add(new Event.UnlinkEvent(src, cdOp.timestamp));
+      }
+      events.add(new Event.CloseEvent(cdOp.trg, -1, cdOp.timestamp));
+      return events.toArray(new Event[0]);
+    case OP_RENAME_OLD:
+      FSEditLogOp.RenameOldOp rnOpOld = (FSEditLogOp.RenameOldOp) op;
+      return new Event[] {
+          new Event.RenameEvent(rnOpOld.src, rnOpOld.dst, rnOpOld.timestamp) };
+    case OP_RENAME:
+      FSEditLogOp.RenameOp rnOp = (FSEditLogOp.RenameOp) op;
+      return new Event[] {
+          new Event.RenameEvent(rnOp.src, rnOp.dst, rnOp.timestamp) };
+    case OP_DELETE:
+      FSEditLogOp.DeleteOp delOp = (FSEditLogOp.DeleteOp) op;
+      return new Event[] { new Event.UnlinkEvent(delOp.path, delOp.timestamp) };
+    case OP_MKDIR:
+      FSEditLogOp.MkdirOp mkOp = (FSEditLogOp.MkdirOp) op;
+      return new Event[] { new Event.CreateEvent.Builder().path(mkOp.path)
+          .ctime(mkOp.timestamp)
+          .ownerName(mkOp.permissions.getUserName())
+          .groupName(mkOp.permissions.getGroupName())
+          .perms(mkOp.permissions.getPermission())
+          .iNodeType(Event.CreateEvent.INodeType.DIRECTORY).build() };
+    case OP_SET_PERMISSIONS:
+      FSEditLogOp.SetPermissionsOp permOp = (FSEditLogOp.SetPermissionsOp) op;
+      return new Event[] { new Event.MetadataUpdateEvent.Builder()
+          .metadataType(Event.MetadataUpdateEvent.MetadataType.PERMS)
+          .path(permOp.src)
+          .perms(permOp.permissions).build() };
+    case OP_SET_OWNER:
+      FSEditLogOp.SetOwnerOp ownOp = (FSEditLogOp.SetOwnerOp) op;
+      return new Event[] { new Event.MetadataUpdateEvent.Builder()
+          .metadataType(Event.MetadataUpdateEvent.MetadataType.OWNER)
+          .path(ownOp.src)
+          .ownerName(ownOp.username).groupName(ownOp.groupname).build() };
+    case OP_TIMES:
+      FSEditLogOp.TimesOp timesOp = (FSEditLogOp.TimesOp) op;
+      return new Event[] { new Event.MetadataUpdateEvent.Builder()
+          .metadataType(Event.MetadataUpdateEvent.MetadataType.TIMES)
+          .path(timesOp.path)
+          .atime(timesOp.atime).mtime(timesOp.mtime).build() };
+    case OP_SYMLINK:
+      FSEditLogOp.SymlinkOp symOp = (FSEditLogOp.SymlinkOp) op;
+      return new Event[] { new Event.CreateEvent.Builder().path(symOp.path)
+          .ctime(symOp.atime)
+          .ownerName(symOp.permissionStatus.getUserName())
+          .groupName(symOp.permissionStatus.getGroupName())
+          .perms(symOp.permissionStatus.getPermission())
+          .symlinkTarget(symOp.value)
+          .iNodeType(Event.CreateEvent.INodeType.SYMLINK).build() };
+    case OP_REMOVE_XATTR:
+      FSEditLogOp.RemoveXAttrOp rxOp = (FSEditLogOp.RemoveXAttrOp) op;
+      return new Event[] { new Event.MetadataUpdateEvent.Builder()
+          .metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS)
+          .path(rxOp.src)
+          .xAttrs(rxOp.xAttrs)
+          .xAttrsRemoved(true).build() };
+    case OP_SET_XATTR:
+      FSEditLogOp.SetXAttrOp sxOp = (FSEditLogOp.SetXAttrOp) op;
+      return new Event[] { new Event.MetadataUpdateEvent.Builder()
+          .metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS)
+          .path(sxOp.src)
+          .xAttrs(sxOp.xAttrs)
+          .xAttrsRemoved(false).build() };
+    case OP_SET_ACL:
+      FSEditLogOp.SetAclOp saOp = (FSEditLogOp.SetAclOp) op;
+      return new Event[] { new Event.MetadataUpdateEvent.Builder()
+          .metadataType(Event.MetadataUpdateEvent.MetadataType.ACLS)
+          .path(saOp.src)
+          .acls(saOp.aclEntries).build() };
+    default:
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
index 4e5bc66..667b2e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
@@ -56,6 +56,17 @@ import com.google.common.collect.Sets;
 public class JournalSet implements JournalManager {
 
   static final Log LOG = LogFactory.getLog(FSEditLog.class);
+
+  private static final Comparator<EditLogInputStream>
+    LOCAL_LOG_PREFERENCE_COMPARATOR = new Comparator<EditLogInputStream>() {
+    @Override
+    public int compare(EditLogInputStream elis1, EditLogInputStream elis2) {
+      // we want local logs to be ordered earlier in the collection, and true
+      // is considered larger than false, so we want to invert the booleans here
+      return ComparisonChain.start().compare(!elis1.isLocalLog(),
+          !elis2.isLocalLog()).result();
+    }
+  };
   
   static final public Comparator<EditLogInputStream>
     EDIT_LOG_INPUT_STREAM_COMPARATOR = new Comparator<EditLogInputStream>() {
@@ -180,6 +191,8 @@ public class JournalSet implements JournalManager {
   private final List<JournalAndStream> journals =
       new CopyOnWriteArrayList<JournalSet.JournalAndStream>();
   final int minimumRedundantJournals;
+
+  private boolean closed;
   
   JournalSet(int minimumRedundantResources) {
     this.minimumRedundantJournals = minimumRedundantResources;
@@ -233,6 +246,11 @@ public class JournalSet implements JournalManager {
         jas.close();
       }
     }, "close journal");
+    closed = true;
+  }
+
+  public boolean isOpen() {
+    return !closed;
   }
 
   /**
@@ -281,10 +299,25 @@ public class JournalSet implements JournalManager {
       if (acc.isEmpty()) {
         acc.add(elis);
       } else {
-        long accFirstTxId = acc.get(0).getFirstTxId();
+        EditLogInputStream accFirst = acc.get(0);
+        long accFirstTxId = accFirst.getFirstTxId();
         if (accFirstTxId == elis.getFirstTxId()) {
-          acc.add(elis);
+          // if we have a finalized log segment available at this txid,
+          // we should throw out all in-progress segments at this txid
+          if (elis.isInProgress()) {
+            if (accFirst.isInProgress()) {
+              acc.add(elis);
+            }
+          } else {
+            if (accFirst.isInProgress()) {
+              acc.clear();
+            }
+            acc.add(elis);
+          }
         } else if (accFirstTxId < elis.getFirstTxId()) {
+          // try to read from the local logs first since the throughput should
+          // be higher
+          Collections.sort(acc, LOCAL_LOG_PREFERENCE_COMPARATOR);
           outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
           acc.clear();
           acc.add(elis);
@@ -296,6 +329,7 @@ public class JournalSet implements JournalManager {
       }
     }
     if (!acc.isEmpty()) {
+      Collections.sort(acc, LOCAL_LOG_PREFERENCE_COMPARATOR);
       outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
       acc.clear();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index a0b636f..e62d162 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -34,6 +34,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
@@ -66,6 +67,8 @@ import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
+import org.apache.hadoop.hdfs.inotify.Event;
+import org.apache.hadoop.hdfs.inotify.EventsList;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -1471,5 +1474,116 @@ class NameNodeRpcServer implements NamenodeProtocols {
   public void checkAccess(String path, FsAction mode) throws IOException {
     namesystem.checkAccess(path, mode);
   }
+
+  @Override // ClientProtocol
+  public long getCurrentEditLogTxid() throws IOException {
+    namesystem.checkOperation(OperationCategory.READ); // only active
+    namesystem.checkSuperuserPrivilege();
+    // if it's not yet open for write, we may be in the process of transitioning
+    // from standby to active and may not yet know what the latest committed
+    // txid is
+    return namesystem.getEditLog().isOpenForWrite() ?
+        namesystem.getEditLog().getLastWrittenTxId() : -1;
+  }
+
+  private static FSEditLogOp readOp(EditLogInputStream elis)
+      throws IOException {
+    try {
+      return elis.readOp();
+      // we can get the below two exceptions if a segment is deleted
+      // (because we have accumulated too many edits) or (for the local journal/
+      // no-QJM case only) if a in-progress segment is finalized under us ...
+      // no need to throw an exception back to the client in this case
+    } catch (FileNotFoundException e) {
+      LOG.debug("Tried to read from deleted or moved edit log segment", e);
+      return null;
+    } catch (TransferFsImage.HttpGetFailedException e) {
+      LOG.debug("Tried to read from deleted edit log segment", e);
+      return null;
+    }
+  }
+
+  @Override // ClientProtocol
+  public EventsList getEditsFromTxid(long txid) throws IOException {
+    namesystem.checkOperation(OperationCategory.READ); // only active
+    namesystem.checkSuperuserPrivilege();
+    int maxEventsPerRPC = nn.conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_KEY,
+        DFSConfigKeys.DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT);
+    FSEditLog log = namesystem.getFSImage().getEditLog();
+    long syncTxid = log.getSyncTxId();
+    // If we haven't synced anything yet, we can only read finalized
+    // segments since we can't reliably determine which txns in in-progress
+    // segments have actually been committed (e.g. written to a quorum of JNs).
+    // If we have synced txns, we can definitely read up to syncTxid since
+    // syncTxid is only updated after a transaction is committed to all
+    // journals. (In-progress segments written by old writers are already
+    // discarded for us, so if we read any in-progress segments they are
+    // guaranteed to have been written by this NameNode.)
+    boolean readInProgress = syncTxid > 0;
+
+    List<Event> events = Lists.newArrayList();
+    long maxSeenTxid = -1;
+    long firstSeenTxid = -1;
+
+    if (syncTxid > 0 && txid > syncTxid) {
+      // we can't read past syncTxid, so there's no point in going any further
+      return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
+    }
+
+    Collection<EditLogInputStream> streams = null;
+    try {
+      streams = log.selectInputStreams(txid, 0, null, readInProgress);
+    } catch (IllegalStateException e) { // can happen if we have
+      // transitioned out of active and haven't yet transitioned to standby
+      // and are using QJM -- the edit log will be closed and this exception
+      // will result
+      LOG.info("NN is transitioning from active to standby and FSEditLog " +
+      "is closed -- could not read edits");
+      return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
+    }
+
+    boolean breakOuter = false;
+    for (EditLogInputStream elis : streams) {
+      // our assumption in this code is the EditLogInputStreams are ordered by
+      // starting txid
+      try {
+        FSEditLogOp op = null;
+        while ((op = readOp(elis)) != null) {
+          // break out of here in the unlikely event that syncTxid is so
+          // out of date that its segment has already been deleted, so the first
+          // txid we get is greater than syncTxid
+          if (syncTxid > 0 && op.getTransactionId() > syncTxid) {
+            breakOuter = true;
+            break;
+          }
+
+          Event[] eventsFromOp = InotifyFSEditLogOpTranslator.translate(op);
+          if (eventsFromOp != null) {
+            events.addAll(Arrays.asList(eventsFromOp));
+          }
+          if (op.getTransactionId() > maxSeenTxid) {
+            maxSeenTxid = op.getTransactionId();
+          }
+          if (firstSeenTxid == -1) {
+            firstSeenTxid = op.getTransactionId();
+          }
+          if (events.size() >= maxEventsPerRPC || (syncTxid > 0 &&
+              op.getTransactionId() == syncTxid)) {
+            // we're done
+            breakOuter = true;
+            break;
+          }
+        }
+      } finally {
+        elis.close();
+      }
+      if (breakOuter) {
+        break;
+      }
+    }
+
+    return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
index 7c642c0..674a957 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/RedundantEditLogInputStream.java
@@ -279,4 +279,9 @@ class RedundantEditLogInputStream extends EditLogInputStream {
       elis.setMaxOpSize(maxOpSize);
     }
   }
+
+  @Override
+  public boolean isLocalLog() {
+    return streams[curIdx].isLocalLog();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
index 242c7d7..160371a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
@@ -63,7 +63,7 @@ import org.apache.http.client.utils.URIBuilder;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
-
+import org.mortbay.jetty.EofException;
 
 /**
  * This class provides fetching a specified file from the NameNode.
@@ -370,6 +370,9 @@ public class TransferFsImage {
           throttler.throttle(num, canceler);
         }
       }
+    } catch (EofException e) {
+      LOG.info("Connection closed by client");
+      out = null; // so we don't close in the finally
     } finally {
       if (out != null) {
         out.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
index edffc9a..f1673ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
@@ -33,6 +33,7 @@ import "hdfs.proto";
 import "acl.proto";
 import "xattr.proto";
 import "encryption.proto";
+import "inotify.proto";
 
 /**
  * The ClientNamenodeProtocol Service defines the interface between a client 
@@ -664,6 +665,21 @@ message CheckAccessRequestProto {
 message CheckAccessResponseProto { // void response
 }
 
+message GetCurrentEditLogTxidRequestProto {
+}
+
+message GetCurrentEditLogTxidResponseProto {
+  required int64 txid = 1;
+}
+
+message GetEditsFromTxidRequestProto {
+  required int64 txid = 1;
+}
+
+message GetEditsFromTxidResponseProto {
+  required EventsListProto eventsList = 1;
+}
+
 service ClientNamenodeProtocol {
   rpc getBlockLocations(GetBlockLocationsRequestProto)
       returns(GetBlockLocationsResponseProto);
@@ -801,4 +817,8 @@ service ClientNamenodeProtocol {
       returns(ListEncryptionZonesResponseProto);
   rpc getEZForPath(GetEZForPathRequestProto)
       returns(GetEZForPathResponseProto);
+  rpc getCurrentEditLogTxid(GetCurrentEditLogTxidRequestProto)
+      returns(GetCurrentEditLogTxidResponseProto);
+  rpc getEditsFromTxid(GetEditsFromTxidRequestProto)
+      returns(GetEditsFromTxidResponseProto);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
new file mode 100644
index 0000000..b58bfcc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are private and stable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *stable* .proto interface.
+ */
+
+// This file contains protocol buffers used to communicate edits to clients
+// as part of the inotify system.
+
+option java_package = "org.apache.hadoop.hdfs.protocol.proto";
+option java_outer_classname = "InotifyProtos";
+option java_generate_equals_and_hash = true;
+package hadoop.hdfs;
+
+import "acl.proto";
+import "xattr.proto";
+import "hdfs.proto";
+
+enum EventType {
+  EVENT_CREATE = 0x0;
+  EVENT_CLOSE = 0x1;
+  EVENT_APPEND = 0x2;
+  EVENT_RENAME = 0x3;
+  EVENT_METADATA = 0x4;
+  EVENT_UNLINK = 0x5;
+}
+
+message EventProto {
+  required EventType type = 1;
+  required bytes contents = 2;
+}
+
+enum INodeType {
+  I_TYPE_FILE = 0x0;
+  I_TYPE_DIRECTORY = 0x1;
+  I_TYPE_SYMLINK = 0x2;
+}
+
+enum MetadataUpdateType {
+  META_TYPE_TIMES = 0x0;
+  META_TYPE_REPLICATION = 0x1;
+  META_TYPE_OWNER = 0x2;
+  META_TYPE_PERMS = 0x3;
+  META_TYPE_ACLS = 0x4;
+  META_TYPE_XATTRS = 0x5;
+}
+
+message CreateEventProto {
+  required INodeType type = 1;
+  required string path = 2;
+  required int64 ctime = 3;
+  required string ownerName = 4;
+  required string groupName = 5;
+  required FsPermissionProto perms = 6;
+  optional int32 replication = 7;
+  optional string symlinkTarget = 8;
+}
+
+message CloseEventProto {
+  required string path = 1;
+  required int64 fileSize = 2;
+  required int64 timestamp = 3;
+}
+
+message AppendEventProto {
+  required string path = 1;
+}
+
+message RenameEventProto {
+  required string srcPath = 1;
+  required string destPath = 2;
+  required int64 timestamp = 3;
+}
+
+message MetadataUpdateEventProto {
+  required string path = 1;
+  required MetadataUpdateType type = 2;
+  optional int64 mtime = 3;
+  optional int64 atime = 4;
+  optional int32 replication = 5;
+  optional string ownerName = 6;
+  optional string groupName = 7;
+  optional FsPermissionProto perms = 8;
+  repeated AclEntryProto acls = 9;
+  repeated XAttrProto xAttrs = 10;
+  optional bool xAttrsRemoved = 11;
+}
+
+message UnlinkEventProto {
+  required string path = 1;
+  required int64 timestamp = 2;
+}
+
+message EventsListProto {
+  repeated EventProto events = 1;
+  required int64 firstTxid = 2;
+  required int64 lastTxid = 3;
+  required int64 syncTxid = 4;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 201560d..557727b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -2066,4 +2066,14 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.namenode.inotify.max.events.per.rpc</name>
+  <value>1000</value>
+  <description>Maximum number of events that will be sent to an inotify client
+    in a single RPC response. The default value attempts to amortize away
+    the overhead for this RPC while avoiding huge memory requirements for the
+    client and NameNode (1000 events should consume no more than 1 MB.)
+  </description>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
new file mode 100644
index 0000000..c268281
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
@@ -0,0 +1,430 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.inotify.Event;
+import org.apache.hadoop.hdfs.inotify.MissingEventsException;
+import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
+import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+import org.apache.hadoop.util.ExitUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URISyntaxException;
+import java.util.EnumSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TestDFSInotifyEventInputStream {
+
+  private static final int BLOCK_SIZE = 1024;
+  private static final Log LOG = LogFactory.getLog(
+      TestDFSInotifyEventInputStream.class);
+
+  private static Event waitForNextEvent(DFSInotifyEventInputStream eis)
+    throws IOException, MissingEventsException {
+    Event next = null;
+    while ((next = eis.poll()) == null);
+    return next;
+  }
+
+  /**
+   * If this test fails, check whether the newly added op should map to an
+   * inotify event, and if so, establish the mapping in
+   * {@link org.apache.hadoop.hdfs.server.namenode.InotifyFSEditLogOpTranslator}
+   * and update testBasic() to include the new op.
+   */
+  @Test
+  public void testOpcodeCount() {
+    Assert.assertTrue(FSEditLogOpCodes.values().length == 46);
+  }
+
+
+  /**
+   * Tests all FsEditLogOps that are converted to inotify events.
+   */
+  @Test(timeout = 120000)
+  @SuppressWarnings("deprecation")
+  public void testBasic() throws IOException, URISyntaxException,
+      InterruptedException, MissingEventsException {
+    Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
+    // so that we can get an atime change
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 1);
+
+    MiniQJMHACluster.Builder builder = new MiniQJMHACluster.Builder(conf);
+    builder.getDfsBuilder().numDataNodes(2);
+    MiniQJMHACluster cluster = builder.build();
+
+    try {
+      cluster.getDfsCluster().waitActive();
+      cluster.getDfsCluster().transitionToActive(0);
+      DFSClient client = new DFSClient(cluster.getDfsCluster().getNameNode(0)
+          .getNameNodeAddress(), conf);
+      FileSystem fs = cluster.getDfsCluster().getFileSystem(0);
+      DFSTestUtil.createFile(fs, new Path("/file"), BLOCK_SIZE, (short) 1, 0L);
+      DFSTestUtil.createFile(fs, new Path("/file3"), BLOCK_SIZE, (short) 1, 0L);
+      DFSTestUtil.createFile(fs, new Path("/file5"), BLOCK_SIZE, (short) 1, 0L);
+      DFSInotifyEventInputStream eis = client.getInotifyEventStream();
+      client.rename("/file", "/file4", null); // RenameOp -> RenameEvent
+      client.rename("/file4", "/file2"); // RenameOldOp -> RenameEvent
+      // DeleteOp, AddOp -> UnlinkEvent, CreateEvent
+      OutputStream os = client.create("/file2", true, (short) 2, BLOCK_SIZE);
+      os.write(new byte[BLOCK_SIZE]);
+      os.close(); // CloseOp -> CloseEvent
+      // AddOp -> AppendEvent
+      os = client.append("/file2", BLOCK_SIZE, null, null);
+      os.write(new byte[BLOCK_SIZE]);
+      os.close(); // CloseOp -> CloseEvent
+      Thread.sleep(10); // so that the atime will get updated on the next line
+      client.open("/file2").read(new byte[1]); // TimesOp -> MetadataUpdateEvent
+      // SetReplicationOp -> MetadataUpdateEvent
+      client.setReplication("/file2", (short) 1);
+      // ConcatDeleteOp -> AppendEvent, UnlinkEvent, CloseEvent
+      client.concat("/file2", new String[]{"/file3"});
+      client.delete("/file2", false); // DeleteOp -> UnlinkEvent
+      client.mkdirs("/dir", null, false); // MkdirOp -> CreateEvent
+      // SetPermissionsOp -> MetadataUpdateEvent
+      client.setPermission("/dir", FsPermission.valueOf("-rw-rw-rw-"));
+      // SetOwnerOp -> MetadataUpdateEvent
+      client.setOwner("/dir", "username", "groupname");
+      client.createSymlink("/dir", "/dir2", false); // SymlinkOp -> CreateEvent
+      client.setXAttr("/file5", "user.field", "value".getBytes(), EnumSet.of(
+          XAttrSetFlag.CREATE)); // SetXAttrOp -> MetadataUpdateEvent
+      // RemoveXAttrOp -> MetadataUpdateEvent
+      client.removeXAttr("/file5", "user.field");
+      // SetAclOp -> MetadataUpdateEvent
+      client.setAcl("/file5", AclEntry.parseAclSpec(
+          "user::rwx,user:foo:rw-,group::r--,other::---", true));
+      client.removeAcl("/file5"); // SetAclOp -> MetadataUpdateEvent
+
+      Event next = null;
+
+      // RenameOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.RENAME);
+      Event.RenameEvent re = (Event.RenameEvent) next;
+      Assert.assertTrue(re.getDstPath().equals("/file4"));
+      Assert.assertTrue(re.getSrcPath().equals("/file"));
+      Assert.assertTrue(re.getTimestamp() > 0);
+
+      long eventsBehind = eis.getEventsBehindEstimate();
+
+      // RenameOldOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.RENAME);
+      Event.RenameEvent re2 = (Event.RenameEvent) next;
+      Assert.assertTrue(re2.getDstPath().equals("/file2"));
+      Assert.assertTrue(re2.getSrcPath().equals("/file4"));
+      Assert.assertTrue(re.getTimestamp() > 0);
+
+      // DeleteOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK);
+      Assert.assertTrue(((Event.UnlinkEvent) next).getPath().equals("/file2"));
+
+      // AddOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
+      Event.CreateEvent ce = (Event.CreateEvent) next;
+      Assert.assertTrue(ce.getiNodeType() == Event.CreateEvent.INodeType.FILE);
+      Assert.assertTrue(ce.getPath().equals("/file2"));
+      Assert.assertTrue(ce.getCtime() > 0);
+      Assert.assertTrue(ce.getReplication() > 0);
+      Assert.assertTrue(ce.getSymlinkTarget() == null);
+
+      // CloseOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE);
+      Event.CloseEvent ce2 = (Event.CloseEvent) next;
+      Assert.assertTrue(ce2.getPath().equals("/file2"));
+      Assert.assertTrue(ce2.getFileSize() > 0);
+      Assert.assertTrue(ce2.getTimestamp() > 0);
+
+      // AddOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.APPEND);
+      Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2"));
+
+      // CloseOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE);
+      Assert.assertTrue(((Event.CloseEvent) next).getPath().equals("/file2"));
+
+      // TimesOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue = (Event.MetadataUpdateEvent) next;
+      Assert.assertTrue(mue.getPath().equals("/file2"));
+      Assert.assertTrue(mue.getMetadataType() ==
+          Event.MetadataUpdateEvent.MetadataType.TIMES);
+
+      // SetReplicationOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue2 = (Event.MetadataUpdateEvent) next;
+      Assert.assertTrue(mue2.getPath().equals("/file2"));
+      Assert.assertTrue(mue2.getMetadataType() ==
+          Event.MetadataUpdateEvent.MetadataType.REPLICATION);
+      Assert.assertTrue(mue2.getReplication() == 1);
+
+      // ConcatDeleteOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.APPEND);
+      Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2"));
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK);
+      Event.UnlinkEvent ue2 = (Event.UnlinkEvent) next;
+      Assert.assertTrue(ue2.getPath().equals("/file3"));
+      Assert.assertTrue(ue2.getTimestamp() > 0);
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE);
+      Event.CloseEvent ce3 = (Event.CloseEvent) next;
+      Assert.assertTrue(ce3.getPath().equals("/file2"));
+      Assert.assertTrue(ce3.getTimestamp() > 0);
+
+      // DeleteOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK);
+      Event.UnlinkEvent ue = (Event.UnlinkEvent) next;
+      Assert.assertTrue(ue.getPath().equals("/file2"));
+      Assert.assertTrue(ue.getTimestamp() > 0);
+
+      // MkdirOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
+      Event.CreateEvent ce4 = (Event.CreateEvent) next;
+      Assert.assertTrue(ce4.getiNodeType() ==
+          Event.CreateEvent.INodeType.DIRECTORY);
+      Assert.assertTrue(ce4.getPath().equals("/dir"));
+      Assert.assertTrue(ce4.getCtime() > 0);
+      Assert.assertTrue(ce4.getReplication() == 0);
+      Assert.assertTrue(ce4.getSymlinkTarget() == null);
+
+      // SetPermissionsOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue3 = (Event.MetadataUpdateEvent) next;
+      Assert.assertTrue(mue3.getPath().equals("/dir"));
+      Assert.assertTrue(mue3.getMetadataType() ==
+          Event.MetadataUpdateEvent.MetadataType.PERMS);
+      Assert.assertTrue(mue3.getPerms().toString().contains("rw-rw-rw-"));
+
+      // SetOwnerOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue4 = (Event.MetadataUpdateEvent) next;
+      Assert.assertTrue(mue4.getPath().equals("/dir"));
+      Assert.assertTrue(mue4.getMetadataType() ==
+          Event.MetadataUpdateEvent.MetadataType.OWNER);
+      Assert.assertTrue(mue4.getOwnerName().equals("username"));
+      Assert.assertTrue(mue4.getGroupName().equals("groupname"));
+
+      // SymlinkOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
+      Event.CreateEvent ce5 = (Event.CreateEvent) next;
+      Assert.assertTrue(ce5.getiNodeType() ==
+          Event.CreateEvent.INodeType.SYMLINK);
+      Assert.assertTrue(ce5.getPath().equals("/dir2"));
+      Assert.assertTrue(ce5.getCtime() > 0);
+      Assert.assertTrue(ce5.getReplication() == 0);
+      Assert.assertTrue(ce5.getSymlinkTarget().equals("/dir"));
+
+      // SetXAttrOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue5 = (Event.MetadataUpdateEvent) next;
+      Assert.assertTrue(mue5.getPath().equals("/file5"));
+      Assert.assertTrue(mue5.getMetadataType() ==
+          Event.MetadataUpdateEvent.MetadataType.XATTRS);
+      Assert.assertTrue(mue5.getxAttrs().size() == 1);
+      Assert.assertTrue(mue5.getxAttrs().get(0).getName().contains("field"));
+      Assert.assertTrue(!mue5.isxAttrsRemoved());
+
+      // RemoveXAttrOp
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue6 = (Event.MetadataUpdateEvent) next;
+      Assert.assertTrue(mue6.getPath().equals("/file5"));
+      Assert.assertTrue(mue6.getMetadataType() ==
+          Event.MetadataUpdateEvent.MetadataType.XATTRS);
+      Assert.assertTrue(mue6.getxAttrs().size() == 1);
+      Assert.assertTrue(mue6.getxAttrs().get(0).getName().contains("field"));
+      Assert.assertTrue(mue6.isxAttrsRemoved());
+
+      // SetAclOp (1)
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue7 = (Event.MetadataUpdateEvent) next;
+      Assert.assertTrue(mue7.getPath().equals("/file5"));
+      Assert.assertTrue(mue7.getMetadataType() ==
+          Event.MetadataUpdateEvent.MetadataType.ACLS);
+      Assert.assertTrue(mue7.getAcls().contains(
+          AclEntry.parseAclEntry("user::rwx", true)));
+
+      // SetAclOp (2)
+      next = waitForNextEvent(eis);
+      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue8 = (Event.MetadataUpdateEvent) next;
+      Assert.assertTrue(mue8.getPath().equals("/file5"));
+      Assert.assertTrue(mue8.getMetadataType() ==
+          Event.MetadataUpdateEvent.MetadataType.ACLS);
+      Assert.assertTrue(mue8.getAcls() == null);
+
+      // Returns null when there are no further events
+      Assert.assertTrue(eis.poll() == null);
+
+      // make sure the estimate hasn't changed since the above assertion
+      // tells us that we are fully caught up to the current namesystem state
+      // and we should not have been behind at all when eventsBehind was set
+      // either, since there were few enough events that they should have all
+      // been read to the client during the first poll() call
+      Assert.assertTrue(eis.getEventsBehindEstimate() == eventsBehind);
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout = 120000)
+  public void testNNFailover() throws IOException, URISyntaxException,
+      MissingEventsException {
+    Configuration conf = new HdfsConfiguration();
+    MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();
+
+    try {
+      cluster.getDfsCluster().waitActive();
+      cluster.getDfsCluster().transitionToActive(0);
+      DFSClient client = ((DistributedFileSystem) HATestUtil.configureFailoverFs
+          (cluster.getDfsCluster(), conf)).dfs;
+      DFSInotifyEventInputStream eis = client.getInotifyEventStream();
+      for (int i = 0; i < 10; i++) {
+        client.mkdirs("/dir" + i, null, false);
+      }
+      cluster.getDfsCluster().shutdownNameNode(0);
+      cluster.getDfsCluster().transitionToActive(1);
+      Event next = null;
+      // we can read all of the edits logged by the old active from the new
+      // active
+      for (int i = 0; i < 10; i++) {
+        next = waitForNextEvent(eis);
+        Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
+        Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" +
+            i));
+      }
+      Assert.assertTrue(eis.poll() == null);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test(timeout = 120000)
+  public void testTwoActiveNNs() throws IOException, MissingEventsException {
+    Configuration conf = new HdfsConfiguration();
+    MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();
+
+    try {
+      cluster.getDfsCluster().waitActive();
+      cluster.getDfsCluster().transitionToActive(0);
+      DFSClient client0 = new DFSClient(cluster.getDfsCluster().getNameNode(0)
+          .getNameNodeAddress(), conf);
+      DFSClient client1 = new DFSClient(cluster.getDfsCluster().getNameNode(1)
+          .getNameNodeAddress(), conf);
+      DFSInotifyEventInputStream eis = client0.getInotifyEventStream();
+      for (int i = 0; i < 10; i++) {
+        client0.mkdirs("/dir" + i, null, false);
+      }
+
+      cluster.getDfsCluster().transitionToActive(1);
+      for (int i = 10; i < 20; i++) {
+        client1.mkdirs("/dir" + i, null, false);
+      }
+
+      // make sure that the old active can't read any further than the edits
+      // it logged itself (it has no idea whether the in-progress edits from
+      // the other writer have actually been committed)
+      Event next = null;
+      for (int i = 0; i < 10; i++) {
+        next = waitForNextEvent(eis);
+        Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
+        Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" +
+            i));
+      }
+      Assert.assertTrue(eis.poll() == null);
+    } finally {
+      try {
+        cluster.shutdown();
+      } catch (ExitUtil.ExitException e) {
+        // expected because the old active will be unable to flush the
+        // end-of-segment op since it is fenced
+      }
+    }
+  }
+
+  @Test(timeout = 120000)
+  public void testReadEventsWithTimeout() throws IOException,
+      InterruptedException, MissingEventsException {
+    Configuration conf = new HdfsConfiguration();
+    MiniQJMHACluster cluster = new MiniQJMHACluster.Builder(conf).build();
+
+    try {
+      cluster.getDfsCluster().waitActive();
+      cluster.getDfsCluster().transitionToActive(0);
+      final DFSClient client = new DFSClient(cluster.getDfsCluster()
+          .getNameNode(0).getNameNodeAddress(), conf);
+      DFSInotifyEventInputStream eis = client.getInotifyEventStream();
+      ScheduledExecutorService ex = Executors
+          .newSingleThreadScheduledExecutor();
+      ex.schedule(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            client.mkdirs("/dir", null, false);
+          } catch (IOException e) {
+            // test will fail
+            LOG.error("Unable to create /dir", e);
+          }
+        }
+      }, 1, TimeUnit.SECONDS);
+      // a very generous wait period -- the edit will definitely have been
+      // processed by the time this is up
+      Event next = eis.poll(5, TimeUnit.SECONDS);
+      Assert.assertTrue(next != null);
+      Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
+      Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir"));
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
index 3166ccc..9380701 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java
@@ -56,7 +56,9 @@ public class MiniQJMHACluster {
     
     public Builder(Configuration conf) {
       this.conf = conf;
-      this.dfsBuilder = new MiniDFSCluster.Builder(conf);
+      // most QJMHACluster tests don't need DataNodes, so we'll make
+      // this the default
+      this.dfsBuilder = new MiniDFSCluster.Builder(conf).numDataNodes(0);
     }
 
     public MiniDFSCluster.Builder getDfsBuilder() {
@@ -102,7 +104,7 @@ public class MiniQJMHACluster {
         cluster = builder.dfsBuilder.nnTopology(topology)
             .manageNameDfsSharedDirs(false).build();
         cluster.waitActive();
-        cluster.shutdown();
+        cluster.shutdownNameNodes();
 
         // initialize the journal nodes
         Configuration confNN0 = cluster.getConfiguration(0);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java
index 4783e8f..2e38d5f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java
@@ -382,7 +382,7 @@ public class TestQJMWithFaults {
     }
 
     @Override
-    protected ExecutorService createExecutor() {
+    protected ExecutorService createSingleThreadExecutor() {
       return MoreExecutors.sameThreadExecutor();
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
index fcb8e55..8bb39f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java
@@ -939,7 +939,7 @@ public class TestQuorumJournalManager {
       public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
           String journalId, InetSocketAddress addr) {
         AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId, addr) {
-          protected ExecutorService createExecutor() {
+          protected ExecutorService createSingleThreadExecutor() {
             // Don't parallelize calls to the quorum in the tests.
             // This makes the tests more deterministic.
             return MoreExecutors.sameThreadExecutor();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
index 8074a68..47a807a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
@@ -916,6 +916,10 @@ public class TestEditLog {
     public void setMaxOpSize(int maxOpSize) {
       reader.setMaxOpSize(maxOpSize);
     }
+
+    @Override public boolean isLocalLog() {
+      return true;
+    }
   }
 
   @Test


Mime
View raw message