hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sj...@apache.org
Subject [16/50] [abbrv] hadoop git commit: HDFS-7446. HDFS inotify should have the ability to determine what txid it has read up to (cmccabe) (cherry picked from commit 75a326aaff8c92349701d9b3473c3070b8c2be44)
Date Sat, 26 Sep 2015 16:05:23 GMT
HDFS-7446. HDFS inotify should have the ability to determine what txid it has read up to (cmccabe)
(cherry picked from commit 75a326aaff8c92349701d9b3473c3070b8c2be44)

(cherry picked from commit 06552a15d5172a2b0ad3d61aa7f9a849857385aa)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/43631451
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/43631451
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/43631451

Branch: refs/heads/branch-2.6
Commit: 4363145128f91b2fb1f1c0254ee5e8621a1ac383
Parents: 8ed162b
Author: Colin Patrick Mccabe <cmccabe@cloudera.com>
Authored: Tue Nov 25 17:44:34 2014 -0800
Committer: Vinod Kumar Vavilapalli <vinodkv@apache.org>
Committed: Tue Sep 8 15:32:17 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hadoop/hdfs/DFSInotifyEventInputStream.java |  65 ++--
 .../apache/hadoop/hdfs/inotify/EventBatch.java  |  41 +++
 .../hadoop/hdfs/inotify/EventBatchList.java     |  63 ++++
 .../apache/hadoop/hdfs/inotify/EventsList.java  |  63 ----
 .../hadoop/hdfs/protocol/ClientProtocol.java    |   8 +-
 .../ClientNamenodeProtocolTranslatorPB.java     |   4 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 341 ++++++++++---------
 .../namenode/InotifyFSEditLogOpTranslator.java  |  74 ++--
 .../hdfs/server/namenode/NameNodeRpcServer.java |  45 +--
 .../hadoop-hdfs/src/main/proto/inotify.proto    |  10 +-
 .../hdfs/TestDFSInotifyEventInputStream.java    | 209 +++++++-----
 12 files changed, 526 insertions(+), 400 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index d2f07c2..1f6ce36 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -156,6 +156,9 @@ Release 2.6.1 - UNRELEASED
     HDFS-7980. Incremental BlockReport will dramatically slow down namenode
     startup.  (Walter Su via szetszwo)
 
+    HDFS-7446. HDFS inotify should have the ability to determine what txid it
+    has read up to (cmccabe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
index 73c5f55..83b92b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java
@@ -19,11 +19,10 @@
 package org.apache.hadoop.hdfs;
 
 import com.google.common.collect.Iterators;
-import com.google.common.util.concurrent.UncheckedExecutionException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.inotify.Event;
-import org.apache.hadoop.hdfs.inotify.EventsList;
+import org.apache.hadoop.hdfs.inotify.EventBatch;
+import org.apache.hadoop.hdfs.inotify.EventBatchList;
 import org.apache.hadoop.hdfs.inotify.MissingEventsException;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.util.Time;
@@ -33,13 +32,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 /**
  * Stream for reading inotify events. DFSInotifyEventInputStreams should not
@@ -52,7 +45,7 @@ public class DFSInotifyEventInputStream {
       .class);
 
   private final ClientProtocol namenode;
-  private Iterator<Event> it;
+  private Iterator<EventBatch> it;
   private long lastReadTxid;
   /**
    * The most recent txid the NameNode told us it has sync'ed -- helps us
@@ -78,22 +71,22 @@ public class DFSInotifyEventInputStream {
   }
 
   /**
-   * Returns the next event in the stream or null if no new events are currently
-   * available.
+   * Returns the next batch of events in the stream or null if no new
+   * batches are currently available.
    *
    * @throws IOException because of network error or edit log
    * corruption. Also possible if JournalNodes are unresponsive in the
    * QJM setting (even one unresponsive JournalNode is enough in rare cases),
    * so catching this exception and retrying at least a few times is
    * recommended.
-   * @throws MissingEventsException if we cannot return the next event in the
-   * stream because the data for the event (and possibly some subsequent events)
-   * has been deleted (generally because this stream is a very large number of
-   * events behind the current state of the NameNode). It is safe to continue
-   * reading from the stream after this exception is thrown -- the next
-   * available event will be returned.
+   * @throws MissingEventsException if we cannot return the next batch in the
+   * stream because the data for the events (and possibly some subsequent
+   * events) has been deleted (generally because this stream is a very large
+   * number of transactions behind the current state of the NameNode). It is
+   * safe to continue reading from the stream after this exception is thrown
+   * The next available batch of events will be returned.
    */
-  public Event poll() throws IOException, MissingEventsException {
+  public EventBatch poll() throws IOException, MissingEventsException {
     // need to keep retrying until the NN sends us the latest committed txid
     if (lastReadTxid == -1) {
       LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
@@ -101,14 +94,14 @@ public class DFSInotifyEventInputStream {
       return null;
     }
     if (!it.hasNext()) {
-      EventsList el = namenode.getEditsFromTxid(lastReadTxid + 1);
+      EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
       if (el.getLastTxid() != -1) {
         // we only want to set syncTxid when we were actually able to read some
         // edits on the NN -- otherwise it will seem like edits are being
         // generated faster than we can read them when the problem is really
         // that we are temporarily unable to read edits
         syncTxid = el.getSyncTxid();
-        it = el.getEvents().iterator();
+        it = el.getBatches().iterator();
         long formerLastReadTxid = lastReadTxid;
         lastReadTxid = el.getLastTxid();
         if (el.getFirstTxid() != formerLastReadTxid + 1) {
@@ -131,18 +124,18 @@ public class DFSInotifyEventInputStream {
   }
 
   /**
-   * Return a estimate of how many events behind the NameNode's current state
-   * this stream is. Clients should periodically call this method and check if
-   * its result is steadily increasing, which indicates that they are falling
-   * behind (i.e. events are being generated faster than the client is reading
-   * them). If a client falls too far behind events may be deleted before the
-   * client can read them.
+   * Return a estimate of how many transaction IDs behind the NameNode's
+   * current state this stream is. Clients should periodically call this method
+   * and check if its result is steadily increasing, which indicates that they
+   * are falling behind (i.e. transaction are being generated faster than the
+   * client is reading them). If a client falls too far behind events may be
+   * deleted before the client can read them.
    * <p/>
    * A return value of -1 indicates that an estimate could not be produced, and
    * should be ignored. The value returned by this method is really only useful
    * when compared to previous or subsequent returned values.
    */
-  public long getEventsBehindEstimate() {
+  public long getTxidsBehindEstimate() {
     if (syncTxid == 0) {
       return -1;
     } else {
@@ -155,8 +148,8 @@ public class DFSInotifyEventInputStream {
   }
 
   /**
-   * Returns the next event in the stream, waiting up to the specified amount of
-   * time for a new event. Returns null if a new event is not available at the
+   * Returns the next event batch in the stream, waiting up to the specified
+   * amount of time for a new batch. Returns null if one is not available at the
    * end of the specified amount of time. The time before the method returns may
    * exceed the specified amount of time by up to the time required for an RPC
    * to the NameNode.
@@ -168,12 +161,12 @@ public class DFSInotifyEventInputStream {
    * see {@link DFSInotifyEventInputStream#poll()}
    * @throws InterruptedException if the calling thread is interrupted
    */
-  public Event poll(long time, TimeUnit tu) throws IOException,
+  public EventBatch poll(long time, TimeUnit tu) throws IOException,
       InterruptedException, MissingEventsException {
     long initialTime = Time.monotonicNow();
     long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
     long nextWait = INITIAL_WAIT_MS;
-    Event next = null;
+    EventBatch next = null;
     while ((next = poll()) == null) {
       long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
       if (timeLeft <= 0) {
@@ -193,17 +186,17 @@ public class DFSInotifyEventInputStream {
   }
 
   /**
-   * Returns the next event in the stream, waiting indefinitely if a new event
-   * is not immediately available.
+   * Returns the next batch of events in the stream, waiting indefinitely if
+   * a new batch  is not immediately available.
    *
    * @throws IOException see {@link DFSInotifyEventInputStream#poll()}
    * @throws MissingEventsException see
    * {@link DFSInotifyEventInputStream#poll()}
    * @throws InterruptedException if the calling thread is interrupted
    */
-  public Event take() throws IOException, InterruptedException,
+  public EventBatch take() throws IOException, InterruptedException,
       MissingEventsException {
-    Event next = null;
+    EventBatch next = null;
     int nextWaitMin = INITIAL_WAIT_MS;
     while ((next = poll()) == null) {
       // sleep for a random period between nextWaitMin and nextWaitMin * 2

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatch.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatch.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatch.java
new file mode 100644
index 0000000..0ad1070
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatch.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.inotify;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A batch of events that all happened on the same transaction ID.
+ */
+@InterfaceAudience.Public
+public class EventBatch {
+  private final long txid;
+  private final Event[] events;
+
+  public EventBatch(long txid, Event[] events) {
+    this.txid = txid;
+    this.events = events;
+  }
+
+  public long getTxid() {
+    return txid;
+  }
+
+  public Event[] getEvents() { return events; }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatchList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatchList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatchList.java
new file mode 100644
index 0000000..9c97038
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatchList.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.inotify;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.util.List;
+
+/**
+ * Contains a list of event batches, the transaction ID in the edit log up to
+ * which we read to produce these events, and the first txid we observed when
+ * producing these events (the last of which is for the purpose of determining
+ * whether we have missed events due to edit deletion). Also contains the most
+ * recent txid that the NameNode has sync'ed, so the client can determine how
+ * far behind in the edit log it is.
+ */
+@InterfaceAudience.Private
+public class EventBatchList {
+  private List<EventBatch> batches;
+  private long firstTxid;
+  private long lastTxid;
+  private long syncTxid;
+
+  public EventBatchList(List<EventBatch> batches, long firstTxid,
+                         long lastTxid, long syncTxid) {
+    this.batches = batches;
+    this.firstTxid = firstTxid;
+    this.lastTxid = lastTxid;
+    this.syncTxid = syncTxid;
+  }
+
+  public List<EventBatch> getBatches() {
+    return batches;
+  }
+
+  public long getFirstTxid() {
+    return firstTxid;
+  }
+
+  public long getLastTxid() {
+    return lastTxid;
+  }
+
+  public long getSyncTxid() {
+    return syncTxid;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java
deleted file mode 100644
index 6d02d3c..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.inotify;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-import java.util.List;
-
-/**
- * Contains a set of events, the transaction ID in the edit log up to which we
- * read to produce these events, and the first txid we observed when producing
- * these events (the last of which is for the purpose of determining whether we
- * have missed events due to edit deletion). Also contains the most recent txid
- * that the NameNode has sync'ed, so the client can determine how far behind in
- * the edit log it is.
- */
-@InterfaceAudience.Private
-public class EventsList {
-  private List<Event> events;
-  private long firstTxid;
-  private long lastTxid;
-  private long syncTxid;
-
-  public EventsList(List<Event> events, long firstTxid, long lastTxid,
-      long syncTxid) {
-    this.events = events;
-    this.firstTxid = firstTxid;
-    this.lastTxid = lastTxid;
-    this.syncTxid = syncTxid;
-  }
-
-  public List<Event> getEvents() {
-    return events;
-  }
-
-  public long getFirstTxid() {
-    return firstTxid;
-  }
-
-  public long getLastTxid() {
-    return lastTxid;
-  }
-
-  public long getSyncTxid() {
-    return syncTxid;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index d29d2eb..f3a390a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.inotify.EventsList;
+import org.apache.hadoop.hdfs.inotify.EventBatchList;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -1405,9 +1405,9 @@ public interface ClientProtocol {
   public long getCurrentEditLogTxid() throws IOException;
 
   /**
-   * Get an ordered list of events corresponding to the edit log transactions
-   * from txid onwards.
+   * Get an ordered list of batches of events corresponding to the edit log
+   * transactions for txids equal to or greater than txid.
    */
   @Idempotent
-  public EventsList getEditsFromTxid(long txid) throws IOException;
+  public EventBatchList getEditsFromTxid(long txid) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 077a3e9..7a2dd15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -44,7 +44,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.inotify.EventsList;
+import org.apache.hadoop.hdfs.inotify.EventBatchList;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
@@ -1480,7 +1480,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
   }
 
   @Override
-  public EventsList getEditsFromTxid(long txid) throws IOException {
+  public EventBatchList getEditsFromTxid(long txid) throws IOException {
     GetEditsFromTxidRequestProto req = GetEditsFromTxidRequestProto.newBuilder()
         .setTxid(txid).build();
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index ae9cb3e..c52588f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -46,11 +46,12 @@ import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
+import org.apache.hadoop.hdfs.inotify.EventBatch;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.inotify.Event;
-import org.apache.hadoop.hdfs.inotify.EventsList;
+import org.apache.hadoop.hdfs.inotify.EventBatchList;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -2516,173 +2517,197 @@ public class PBHelper {
     }
   }
 
-  public static EventsList convert(GetEditsFromTxidResponseProto resp) throws
+  public static EventBatchList convert(GetEditsFromTxidResponseProto resp) throws
     IOException {
-    List<Event> events = Lists.newArrayList();
-    for (InotifyProtos.EventProto p : resp.getEventsList().getEventsList()) {
-      switch(p.getType()) {
-      case EVENT_CLOSE:
-        InotifyProtos.CloseEventProto close =
-            InotifyProtos.CloseEventProto.parseFrom(p.getContents());
-        events.add(new Event.CloseEvent(close.getPath(), close.getFileSize(),
-            close.getTimestamp()));
-        break;
-      case EVENT_CREATE:
-        InotifyProtos.CreateEventProto create =
-            InotifyProtos.CreateEventProto.parseFrom(p.getContents());
-        events.add(new Event.CreateEvent.Builder()
-            .iNodeType(createTypeConvert(create.getType()))
-            .path(create.getPath())
-            .ctime(create.getCtime())
-            .ownerName(create.getOwnerName())
-            .groupName(create.getGroupName())
-            .perms(convert(create.getPerms()))
-            .replication(create.getReplication())
-            .symlinkTarget(create.getSymlinkTarget().isEmpty() ? null :
-            create.getSymlinkTarget())
-            .overwrite(create.getOverwrite()).build());
-        break;
-      case EVENT_METADATA:
-        InotifyProtos.MetadataUpdateEventProto meta =
-            InotifyProtos.MetadataUpdateEventProto.parseFrom(p.getContents());
-        events.add(new Event.MetadataUpdateEvent.Builder()
-            .path(meta.getPath())
-            .metadataType(metadataUpdateTypeConvert(meta.getType()))
-            .mtime(meta.getMtime())
-            .atime(meta.getAtime())
-            .replication(meta.getReplication())
-            .ownerName(
-                meta.getOwnerName().isEmpty() ? null : meta.getOwnerName())
-            .groupName(
-                meta.getGroupName().isEmpty() ? null : meta.getGroupName())
-            .perms(meta.hasPerms() ? convert(meta.getPerms()) : null)
-            .acls(meta.getAclsList().isEmpty() ? null : convertAclEntry(
-                meta.getAclsList()))
-            .xAttrs(meta.getXAttrsList().isEmpty() ? null : convertXAttrs(
-                meta.getXAttrsList()))
-            .xAttrsRemoved(meta.getXAttrsRemoved())
-            .build());
-        break;
-      case EVENT_RENAME:
-        InotifyProtos.RenameEventProto rename =
-            InotifyProtos.RenameEventProto.parseFrom(p.getContents());
-        events.add(new Event.RenameEvent(rename.getSrcPath(), rename.getDestPath(),
-            rename.getTimestamp()));
-        break;
-      case EVENT_APPEND:
-        InotifyProtos.AppendEventProto reopen =
-            InotifyProtos.AppendEventProto.parseFrom(p.getContents());
-        events.add(new Event.AppendEvent(reopen.getPath()));
-        break;
-      case EVENT_UNLINK:
-        InotifyProtos.UnlinkEventProto unlink =
-            InotifyProtos.UnlinkEventProto.parseFrom(p.getContents());
-        events.add(new Event.UnlinkEvent(unlink.getPath(), unlink.getTimestamp()));
-        break;
-      default:
-        throw new RuntimeException("Unexpected inotify event type: " +
-            p.getType());
+    final InotifyProtos.EventsListProto list = resp.getEventsList();
+    final long firstTxid = list.getFirstTxid();
+    final long lastTxid = list.getLastTxid();
+
+    List<EventBatch> batches = Lists.newArrayList();
+    if (list.getEventsList().size() > 0) {
+      throw new IOException("Can't handle old inotify server response.");
+    }
+    for (InotifyProtos.EventBatchProto bp : list.getBatchList()) {
+      long txid = bp.getTxid();
+      if ((txid != -1) && ((txid < firstTxid) || (txid > lastTxid))) {
+        throw new IOException("Error converting TxidResponseProto: got a " +
+            "transaction id " + txid + " that was outside the range of [" +
+            firstTxid + ", " + lastTxid + "].");
+      }
+      List<Event> events = Lists.newArrayList();
+      for (InotifyProtos.EventProto p : bp.getEventsList()) {
+        switch (p.getType()) {
+          case EVENT_CLOSE:
+            InotifyProtos.CloseEventProto close =
+                InotifyProtos.CloseEventProto.parseFrom(p.getContents());
+            events.add(new Event.CloseEvent(close.getPath(),
+                close.getFileSize(), close.getTimestamp()));
+            break;
+          case EVENT_CREATE:
+            InotifyProtos.CreateEventProto create =
+                InotifyProtos.CreateEventProto.parseFrom(p.getContents());
+            events.add(new Event.CreateEvent.Builder()
+                .iNodeType(createTypeConvert(create.getType()))
+                .path(create.getPath())
+                .ctime(create.getCtime())
+                .ownerName(create.getOwnerName())
+                .groupName(create.getGroupName())
+                .perms(convert(create.getPerms()))
+                .replication(create.getReplication())
+                .symlinkTarget(create.getSymlinkTarget().isEmpty() ? null :
+                    create.getSymlinkTarget())
+                .overwrite(create.getOverwrite()).build());
+            break;
+          case EVENT_METADATA:
+            InotifyProtos.MetadataUpdateEventProto meta =
+                InotifyProtos.MetadataUpdateEventProto.parseFrom(p.getContents());
+            events.add(new Event.MetadataUpdateEvent.Builder()
+                .path(meta.getPath())
+                .metadataType(metadataUpdateTypeConvert(meta.getType()))
+                .mtime(meta.getMtime())
+                .atime(meta.getAtime())
+                .replication(meta.getReplication())
+                .ownerName(
+                    meta.getOwnerName().isEmpty() ? null : meta.getOwnerName())
+                .groupName(
+                    meta.getGroupName().isEmpty() ? null : meta.getGroupName())
+                .perms(meta.hasPerms() ? convert(meta.getPerms()) : null)
+                .acls(meta.getAclsList().isEmpty() ? null : convertAclEntry(
+                    meta.getAclsList()))
+                .xAttrs(meta.getXAttrsList().isEmpty() ? null : convertXAttrs(
+                    meta.getXAttrsList()))
+                .xAttrsRemoved(meta.getXAttrsRemoved())
+                .build());
+            break;
+          case EVENT_RENAME:
+            InotifyProtos.RenameEventProto rename =
+                InotifyProtos.RenameEventProto.parseFrom(p.getContents());
+            events.add(new Event.RenameEvent(rename.getSrcPath(),
+                rename.getDestPath(), rename.getTimestamp()));
+            break;
+          case EVENT_APPEND:
+            InotifyProtos.AppendEventProto reopen =
+                InotifyProtos.AppendEventProto.parseFrom(p.getContents());
+            events.add(new Event.AppendEvent(reopen.getPath()));
+            break;
+          case EVENT_UNLINK:
+            InotifyProtos.UnlinkEventProto unlink =
+                InotifyProtos.UnlinkEventProto.parseFrom(p.getContents());
+            events.add(new Event.UnlinkEvent(unlink.getPath(),
+                unlink.getTimestamp()));
+            break;
+          default:
+            throw new RuntimeException("Unexpected inotify event type: " +
+                p.getType());
+        }
       }
+      batches.add(new EventBatch(txid, events.toArray(new Event[0])));
     }
-    return new EventsList(events, resp.getEventsList().getFirstTxid(),
+    return new EventBatchList(batches, resp.getEventsList().getFirstTxid(),
         resp.getEventsList().getLastTxid(), resp.getEventsList().getSyncTxid());
   }
 
-  public static GetEditsFromTxidResponseProto convertEditsResponse(EventsList el) {
+  public static GetEditsFromTxidResponseProto convertEditsResponse(EventBatchList el) {
     InotifyProtos.EventsListProto.Builder builder =
         InotifyProtos.EventsListProto.newBuilder();
-    for (Event e : el.getEvents()) {
-      switch(e.getEventType()) {
-      case CLOSE:
-        Event.CloseEvent ce = (Event.CloseEvent) e;
-        builder.addEvents(InotifyProtos.EventProto.newBuilder()
-            .setType(InotifyProtos.EventType.EVENT_CLOSE)
-            .setContents(
-                InotifyProtos.CloseEventProto.newBuilder()
-                    .setPath(ce.getPath())
-                    .setFileSize(ce.getFileSize())
-                    .setTimestamp(ce.getTimestamp()).build().toByteString()
-            ).build());
-        break;
-      case CREATE:
-        Event.CreateEvent ce2 = (Event.CreateEvent) e;
-        builder.addEvents(InotifyProtos.EventProto.newBuilder()
-            .setType(InotifyProtos.EventType.EVENT_CREATE)
-            .setContents(
-                InotifyProtos.CreateEventProto.newBuilder()
-                    .setType(createTypeConvert(ce2.getiNodeType()))
-                    .setPath(ce2.getPath())
-                    .setCtime(ce2.getCtime())
-                    .setOwnerName(ce2.getOwnerName())
-                    .setGroupName(ce2.getGroupName())
-                    .setPerms(convert(ce2.getPerms()))
-                    .setReplication(ce2.getReplication())
-                    .setSymlinkTarget(ce2.getSymlinkTarget() == null ?
-                        "" : ce2.getSymlinkTarget())
-                    .setOverwrite(ce2.getOverwrite()).build().toByteString()
-            ).build());
-        break;
-      case METADATA:
-        Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e;
-        InotifyProtos.MetadataUpdateEventProto.Builder metaB =
-            InotifyProtos.MetadataUpdateEventProto.newBuilder()
-                .setPath(me.getPath())
-                .setType(metadataUpdateTypeConvert(me.getMetadataType()))
-                .setMtime(me.getMtime())
-                .setAtime(me.getAtime())
-                .setReplication(me.getReplication())
-                .setOwnerName(me.getOwnerName() == null ? "" :
-                    me.getOwnerName())
-                .setGroupName(me.getGroupName() == null ? "" :
-                    me.getGroupName())
-                .addAllAcls(me.getAcls() == null ?
-                    Lists.<AclEntryProto>newArrayList() :
-                    convertAclEntryProto(me.getAcls()))
-                .addAllXAttrs(me.getxAttrs() == null ?
-                    Lists.<XAttrProto>newArrayList() :
-                    convertXAttrProto(me.getxAttrs()))
-                .setXAttrsRemoved(me.isxAttrsRemoved());
-        if (me.getPerms() != null) {
-          metaB.setPerms(convert(me.getPerms()));
+    for (EventBatch b : el.getBatches()) {
+      List<InotifyProtos.EventProto> events = Lists.newArrayList();
+      for (Event e : b.getEvents()) {
+        switch (e.getEventType()) {
+          case CLOSE:
+            Event.CloseEvent ce = (Event.CloseEvent) e;
+            events.add(InotifyProtos.EventProto.newBuilder()
+                .setType(InotifyProtos.EventType.EVENT_CLOSE)
+                .setContents(
+                    InotifyProtos.CloseEventProto.newBuilder()
+                        .setPath(ce.getPath())
+                        .setFileSize(ce.getFileSize())
+                        .setTimestamp(ce.getTimestamp()).build().toByteString()
+                ).build());
+            break;
+          case CREATE:
+            Event.CreateEvent ce2 = (Event.CreateEvent) e;
+            events.add(InotifyProtos.EventProto.newBuilder()
+                .setType(InotifyProtos.EventType.EVENT_CREATE)
+                .setContents(
+                    InotifyProtos.CreateEventProto.newBuilder()
+                        .setType(createTypeConvert(ce2.getiNodeType()))
+                        .setPath(ce2.getPath())
+                        .setCtime(ce2.getCtime())
+                        .setOwnerName(ce2.getOwnerName())
+                        .setGroupName(ce2.getGroupName())
+                        .setPerms(convert(ce2.getPerms()))
+                        .setReplication(ce2.getReplication())
+                        .setSymlinkTarget(ce2.getSymlinkTarget() == null ?
+                            "" : ce2.getSymlinkTarget())
+                        .setOverwrite(ce2.getOverwrite()).build().toByteString()
+                ).build());
+            break;
+          case METADATA:
+            Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e;
+            InotifyProtos.MetadataUpdateEventProto.Builder metaB =
+                InotifyProtos.MetadataUpdateEventProto.newBuilder()
+                    .setPath(me.getPath())
+                    .setType(metadataUpdateTypeConvert(me.getMetadataType()))
+                    .setMtime(me.getMtime())
+                    .setAtime(me.getAtime())
+                    .setReplication(me.getReplication())
+                    .setOwnerName(me.getOwnerName() == null ? "" :
+                        me.getOwnerName())
+                    .setGroupName(me.getGroupName() == null ? "" :
+                        me.getGroupName())
+                    .addAllAcls(me.getAcls() == null ?
+                        Lists.<AclEntryProto>newArrayList() :
+                        convertAclEntryProto(me.getAcls()))
+                    .addAllXAttrs(me.getxAttrs() == null ?
+                        Lists.<XAttrProto>newArrayList() :
+                        convertXAttrProto(me.getxAttrs()))
+                    .setXAttrsRemoved(me.isxAttrsRemoved());
+            if (me.getPerms() != null) {
+              metaB.setPerms(convert(me.getPerms()));
+            }
+            events.add(InotifyProtos.EventProto.newBuilder()
+                .setType(InotifyProtos.EventType.EVENT_METADATA)
+                .setContents(metaB.build().toByteString())
+                .build());
+            break;
+          case RENAME:
+            Event.RenameEvent re = (Event.RenameEvent) e;
+            events.add(InotifyProtos.EventProto.newBuilder()
+                .setType(InotifyProtos.EventType.EVENT_RENAME)
+                .setContents(
+                    InotifyProtos.RenameEventProto.newBuilder()
+                        .setSrcPath(re.getSrcPath())
+                        .setDestPath(re.getDstPath())
+                        .setTimestamp(re.getTimestamp()).build().toByteString()
+                ).build());
+            break;
+          case APPEND:
+            Event.AppendEvent re2 = (Event.AppendEvent) e;
+            events.add(InotifyProtos.EventProto.newBuilder()
+                .setType(InotifyProtos.EventType.EVENT_APPEND)
+                .setContents(
+                    InotifyProtos.AppendEventProto.newBuilder()
+                        .setPath(re2.getPath()).build().toByteString()
+                ).build());
+            break;
+          case UNLINK:
+            Event.UnlinkEvent ue = (Event.UnlinkEvent) e;
+            events.add(InotifyProtos.EventProto.newBuilder()
+                .setType(InotifyProtos.EventType.EVENT_UNLINK)
+                .setContents(
+                    InotifyProtos.UnlinkEventProto.newBuilder()
+                        .setPath(ue.getPath())
+                        .setTimestamp(ue.getTimestamp()).build().toByteString()
+                ).build());
+            break;
+          default:
+            throw new RuntimeException("Unexpected inotify event: " + e);
         }
-        builder.addEvents(InotifyProtos.EventProto.newBuilder()
-            .setType(InotifyProtos.EventType.EVENT_METADATA)
-            .setContents(metaB.build().toByteString())
-            .build());
-        break;
-      case RENAME:
-        Event.RenameEvent re = (Event.RenameEvent) e;
-        builder.addEvents(InotifyProtos.EventProto.newBuilder()
-            .setType(InotifyProtos.EventType.EVENT_RENAME)
-            .setContents(
-                InotifyProtos.RenameEventProto.newBuilder()
-                    .setSrcPath(re.getSrcPath())
-                    .setDestPath(re.getDstPath())
-                    .setTimestamp(re.getTimestamp()).build().toByteString()
-            ).build());
-        break;
-      case APPEND:
-        Event.AppendEvent re2 = (Event.AppendEvent) e;
-        builder.addEvents(InotifyProtos.EventProto.newBuilder()
-            .setType(InotifyProtos.EventType.EVENT_APPEND)
-            .setContents(
-                InotifyProtos.AppendEventProto.newBuilder()
-                    .setPath(re2.getPath()).build().toByteString()
-            ).build());
-        break;
-      case UNLINK:
-        Event.UnlinkEvent ue = (Event.UnlinkEvent) e;
-        builder.addEvents(InotifyProtos.EventProto.newBuilder()
-            .setType(InotifyProtos.EventType.EVENT_UNLINK)
-            .setContents(
-                InotifyProtos.UnlinkEventProto.newBuilder()
-                    .setPath(ue.getPath())
-                    .setTimestamp(ue.getTimestamp()).build().toByteString()
-            ).build());
-        break;
-      default:
-        throw new RuntimeException("Unexpected inotify event: " + e);
       }
+      builder.addBatch(InotifyProtos.EventBatchProto.newBuilder().
+          setTxid(b.getTxid()).
+          addAllEvents(events));
     }
     builder.setFirstTxid(el.getFirstTxid());
     builder.setLastTxid(el.getLastTxid());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
index 00a5f25..cd3fc23 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.inotify.Event;
+import org.apache.hadoop.hdfs.inotify.EventBatch;
 import org.apache.hadoop.hdfs.protocol.Block;
 
 import java.util.List;
@@ -39,32 +40,35 @@ public class InotifyFSEditLogOpTranslator {
     return size;
   }
 
-  public static Event[] translate(FSEditLogOp op) {
+  public static EventBatch translate(FSEditLogOp op) {
     switch(op.opCode) {
     case OP_ADD:
       FSEditLogOp.AddOp addOp = (FSEditLogOp.AddOp) op;
       if (addOp.blocks.length == 0) { // create
-        return new Event[] { new Event.CreateEvent.Builder().path(addOp.path)
+        return new EventBatch(op.txid,
+            new Event[] { new Event.CreateEvent.Builder().path(addOp.path)
             .ctime(addOp.atime)
             .replication(addOp.replication)
             .ownerName(addOp.permissions.getUserName())
             .groupName(addOp.permissions.getGroupName())
             .perms(addOp.permissions.getPermission())
             .overwrite(addOp.overwrite)
-            .iNodeType(Event.CreateEvent.INodeType.FILE).build() };
+            .iNodeType(Event.CreateEvent.INodeType.FILE).build() });
       } else {
-        return new Event[] { new Event.AppendEvent(addOp.path) };
+        return new EventBatch(op.txid,
+            new Event[] { new Event.AppendEvent(addOp.path) });
       }
     case OP_CLOSE:
       FSEditLogOp.CloseOp cOp = (FSEditLogOp.CloseOp) op;
-      return new Event[] {
-          new Event.CloseEvent(cOp.path, getSize(cOp), cOp.mtime) };
+      return new EventBatch(op.txid, new Event[] {
+          new Event.CloseEvent(cOp.path, getSize(cOp), cOp.mtime) });
     case OP_SET_REPLICATION:
       FSEditLogOp.SetReplicationOp setRepOp = (FSEditLogOp.SetReplicationOp) op;
-      return new Event[] { new Event.MetadataUpdateEvent.Builder()
+      return new EventBatch(op.txid,
+        new Event[] { new Event.MetadataUpdateEvent.Builder()
           .metadataType(Event.MetadataUpdateEvent.MetadataType.REPLICATION)
           .path(setRepOp.path)
-          .replication(setRepOp.replication).build() };
+          .replication(setRepOp.replication).build() });
     case OP_CONCAT_DELETE:
       FSEditLogOp.ConcatDeleteOp cdOp = (FSEditLogOp.ConcatDeleteOp) op;
       List<Event> events = Lists.newArrayList();
@@ -73,73 +77,83 @@ public class InotifyFSEditLogOpTranslator {
         events.add(new Event.UnlinkEvent(src, cdOp.timestamp));
       }
       events.add(new Event.CloseEvent(cdOp.trg, -1, cdOp.timestamp));
-      return events.toArray(new Event[0]);
+      return new EventBatch(op.txid, events.toArray(new Event[0]));
     case OP_RENAME_OLD:
       FSEditLogOp.RenameOldOp rnOpOld = (FSEditLogOp.RenameOldOp) op;
-      return new Event[] {
-          new Event.RenameEvent(rnOpOld.src, rnOpOld.dst, rnOpOld.timestamp) };
+      return new EventBatch(op.txid, new Event[] {
+          new Event.RenameEvent(rnOpOld.src,
+              rnOpOld.dst, rnOpOld.timestamp) });
     case OP_RENAME:
       FSEditLogOp.RenameOp rnOp = (FSEditLogOp.RenameOp) op;
-      return new Event[] {
-          new Event.RenameEvent(rnOp.src, rnOp.dst, rnOp.timestamp) };
+      return new EventBatch(op.txid, new Event[] {
+          new Event.RenameEvent(rnOp.src, rnOp.dst, rnOp.timestamp) });
     case OP_DELETE:
       FSEditLogOp.DeleteOp delOp = (FSEditLogOp.DeleteOp) op;
-      return new Event[] { new Event.UnlinkEvent(delOp.path, delOp.timestamp) };
+      return new EventBatch(op.txid, new Event[] {
+          new Event.UnlinkEvent(delOp.path, delOp.timestamp) });
     case OP_MKDIR:
       FSEditLogOp.MkdirOp mkOp = (FSEditLogOp.MkdirOp) op;
-      return new Event[] { new Event.CreateEvent.Builder().path(mkOp.path)
+      return new EventBatch(op.txid,
+        new Event[] { new Event.CreateEvent.Builder().path(mkOp.path)
           .ctime(mkOp.timestamp)
           .ownerName(mkOp.permissions.getUserName())
           .groupName(mkOp.permissions.getGroupName())
           .perms(mkOp.permissions.getPermission())
-          .iNodeType(Event.CreateEvent.INodeType.DIRECTORY).build() };
+          .iNodeType(Event.CreateEvent.INodeType.DIRECTORY).build() });
     case OP_SET_PERMISSIONS:
       FSEditLogOp.SetPermissionsOp permOp = (FSEditLogOp.SetPermissionsOp) op;
-      return new Event[] { new Event.MetadataUpdateEvent.Builder()
+      return new EventBatch(op.txid,
+        new Event[] { new Event.MetadataUpdateEvent.Builder()
           .metadataType(Event.MetadataUpdateEvent.MetadataType.PERMS)
           .path(permOp.src)
-          .perms(permOp.permissions).build() };
+          .perms(permOp.permissions).build() });
     case OP_SET_OWNER:
       FSEditLogOp.SetOwnerOp ownOp = (FSEditLogOp.SetOwnerOp) op;
-      return new Event[] { new Event.MetadataUpdateEvent.Builder()
+      return new EventBatch(op.txid,
+        new Event[] { new Event.MetadataUpdateEvent.Builder()
           .metadataType(Event.MetadataUpdateEvent.MetadataType.OWNER)
           .path(ownOp.src)
-          .ownerName(ownOp.username).groupName(ownOp.groupname).build() };
+          .ownerName(ownOp.username).groupName(ownOp.groupname).build() });
     case OP_TIMES:
       FSEditLogOp.TimesOp timesOp = (FSEditLogOp.TimesOp) op;
-      return new Event[] { new Event.MetadataUpdateEvent.Builder()
+      return new EventBatch(op.txid,
+        new Event[] { new Event.MetadataUpdateEvent.Builder()
           .metadataType(Event.MetadataUpdateEvent.MetadataType.TIMES)
           .path(timesOp.path)
-          .atime(timesOp.atime).mtime(timesOp.mtime).build() };
+          .atime(timesOp.atime).mtime(timesOp.mtime).build() });
     case OP_SYMLINK:
       FSEditLogOp.SymlinkOp symOp = (FSEditLogOp.SymlinkOp) op;
-      return new Event[] { new Event.CreateEvent.Builder().path(symOp.path)
+      return new EventBatch(op.txid,
+        new Event[] { new Event.CreateEvent.Builder().path(symOp.path)
           .ctime(symOp.atime)
           .ownerName(symOp.permissionStatus.getUserName())
           .groupName(symOp.permissionStatus.getGroupName())
           .perms(symOp.permissionStatus.getPermission())
           .symlinkTarget(symOp.value)
-          .iNodeType(Event.CreateEvent.INodeType.SYMLINK).build() };
+          .iNodeType(Event.CreateEvent.INodeType.SYMLINK).build() });
     case OP_REMOVE_XATTR:
       FSEditLogOp.RemoveXAttrOp rxOp = (FSEditLogOp.RemoveXAttrOp) op;
-      return new Event[] { new Event.MetadataUpdateEvent.Builder()
+      return new EventBatch(op.txid,
+        new Event[] { new Event.MetadataUpdateEvent.Builder()
           .metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS)
           .path(rxOp.src)
           .xAttrs(rxOp.xAttrs)
-          .xAttrsRemoved(true).build() };
+          .xAttrsRemoved(true).build() });
     case OP_SET_XATTR:
       FSEditLogOp.SetXAttrOp sxOp = (FSEditLogOp.SetXAttrOp) op;
-      return new Event[] { new Event.MetadataUpdateEvent.Builder()
+      return new EventBatch(op.txid,
+        new Event[] { new Event.MetadataUpdateEvent.Builder()
           .metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS)
           .path(sxOp.src)
           .xAttrs(sxOp.xAttrs)
-          .xAttrsRemoved(false).build() };
+          .xAttrsRemoved(false).build() });
     case OP_SET_ACL:
       FSEditLogOp.SetAclOp saOp = (FSEditLogOp.SetAclOp) op;
-      return new Event[] { new Event.MetadataUpdateEvent.Builder()
+      return new EventBatch(op.txid,
+        new Event[] { new Event.MetadataUpdateEvent.Builder()
           .metadataType(Event.MetadataUpdateEvent.MetadataType.ACLS)
           .path(saOp.src)
-          .acls(saOp.aclEntries).build() };
+          .acls(saOp.aclEntries).build() });
     default:
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 690d7e1..16cec5c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -34,7 +34,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
@@ -55,8 +54,8 @@ import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.ha.HAServiceStatus;
 import org.apache.hadoop.ha.HealthCheckFailedException;
@@ -67,8 +66,8 @@ import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
-import org.apache.hadoop.hdfs.inotify.Event;
-import org.apache.hadoop.hdfs.inotify.EventsList;
+import org.apache.hadoop.hdfs.inotify.EventBatch;
+import org.apache.hadoop.hdfs.inotify.EventBatchList;
 import org.apache.hadoop.hdfs.protocol.AclException;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -139,10 +138,16 @@ import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.ipc.WritableRpcEngine;
 import org.apache.hadoop.ipc.RefreshRegistry;
 import org.apache.hadoop.ipc.RefreshResponse;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ipc.WritableRpcEngine;
+import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService;
+import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService;
+import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
+import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
+import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Groups;
@@ -155,19 +160,12 @@ import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolP
 import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolServerSideTranslatorPB;
 import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB;
 import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB;
-import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
-import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB;
-import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService;
-import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
-import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB;
-import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
 import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
 import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
 import org.apache.hadoop.tracing.SpanReceiverInfo;
-import org.apache.hadoop.tracing.TraceAdminPB;
 import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
 import org.apache.hadoop.tracing.TraceAdminProtocolPB;
 import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
@@ -175,6 +173,7 @@ import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.util.VersionUtil;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 import com.google.protobuf.BlockingService;
 
 /**
@@ -1670,7 +1669,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   }
 
   @Override // ClientProtocol
-  public EventsList getEditsFromTxid(long txid) throws IOException {
+  public EventBatchList getEditsFromTxid(long txid) throws IOException {
     checkNNStartup();
     namesystem.checkOperation(OperationCategory.READ); // only active
     namesystem.checkSuperuserPrivilege();
@@ -1689,13 +1688,14 @@ class NameNodeRpcServer implements NamenodeProtocols {
     // guaranteed to have been written by this NameNode.)
     boolean readInProgress = syncTxid > 0;
 
-    List<Event> events = Lists.newArrayList();
+    List<EventBatch> batches = Lists.newArrayList();
+    int totalEvents = 0;
     long maxSeenTxid = -1;
     long firstSeenTxid = -1;
 
     if (syncTxid > 0 && txid > syncTxid) {
       // we can't read past syncTxid, so there's no point in going any further
-      return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
+      return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
     }
 
     Collection<EditLogInputStream> streams = null;
@@ -1707,7 +1707,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
       // will result
       LOG.info("NN is transitioning from active to standby and FSEditLog " +
       "is closed -- could not read edits");
-      return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
+      return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
     }
 
     boolean breakOuter = false;
@@ -1725,9 +1725,10 @@ class NameNodeRpcServer implements NamenodeProtocols {
             break;
           }
 
-          Event[] eventsFromOp = InotifyFSEditLogOpTranslator.translate(op);
-          if (eventsFromOp != null) {
-            events.addAll(Arrays.asList(eventsFromOp));
+          EventBatch eventBatch = InotifyFSEditLogOpTranslator.translate(op);
+          if (eventBatch != null) {
+            batches.add(eventBatch);
+            totalEvents += eventBatch.getEvents().length;
           }
           if (op.getTransactionId() > maxSeenTxid) {
             maxSeenTxid = op.getTransactionId();
@@ -1735,7 +1736,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
           if (firstSeenTxid == -1) {
             firstSeenTxid = op.getTransactionId();
           }
-          if (events.size() >= maxEventsPerRPC || (syncTxid > 0 &&
+          if (totalEvents >= maxEventsPerRPC || (syncTxid > 0 &&
               op.getTransactionId() == syncTxid)) {
             // we're done
             breakOuter = true;
@@ -1750,7 +1751,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
       }
     }
 
-    return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid);
+    return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid);
   }
 
   @Override // TraceAdminProtocol

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
index a1d4d92..e51c02c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
@@ -48,6 +48,11 @@ message EventProto {
   required bytes contents = 2;
 }
 
+message EventBatchProto {
+  required int64 txid = 1;
+  repeated EventProto events = 2;
+}
+
 enum INodeType {
   I_TYPE_FILE = 0x0;
   I_TYPE_DIRECTORY = 0x1;
@@ -111,8 +116,9 @@ message UnlinkEventProto {
 }
 
 message EventsListProto {
-  repeated EventProto events = 1;
+  repeated EventProto events = 1; // deprecated
   required int64 firstTxid = 2;
   required int64 lastTxid = 3;
   required int64 syncTxid = 4;
-}
\ No newline at end of file
+  repeated EventBatchProto batch = 5;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
index a608ba8..82db110 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.inotify.Event;
+import org.apache.hadoop.hdfs.inotify.EventBatch;
 import org.apache.hadoop.hdfs.inotify.MissingEventsException;
 import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
@@ -49,11 +50,17 @@ public class TestDFSInotifyEventInputStream {
   private static final Log LOG = LogFactory.getLog(
       TestDFSInotifyEventInputStream.class);
 
-  private static Event waitForNextEvent(DFSInotifyEventInputStream eis)
+  private static EventBatch waitForNextEvents(DFSInotifyEventInputStream eis)
     throws IOException, MissingEventsException {
-    Event next = null;
-    while ((next = eis.poll()) == null);
-    return next;
+    EventBatch batch = null;
+    while ((batch = eis.poll()) == null);
+    return batch;
+  }
+
+  private static long checkTxid(EventBatch batch, long prevTxid){
+    Assert.assertTrue("Previous txid " + prevTxid + " was not less than " +
+        "new txid " + batch.getTxid(), prevTxid < batch.getTxid());
+    return batch.getTxid();
   }
 
   /**
@@ -64,7 +71,7 @@ public class TestDFSInotifyEventInputStream {
    */
   @Test
   public void testOpcodeCount() {
-    Assert.assertTrue(FSEditLogOpCodes.values().length == 47);
+    Assert.assertEquals(47, FSEditLogOpCodes.values().length);
   }
 
 
@@ -127,30 +134,36 @@ public class TestDFSInotifyEventInputStream {
           "user::rwx,user:foo:rw-,group::r--,other::---", true));
       client.removeAcl("/file5"); // SetAclOp -> MetadataUpdateEvent
 
-      Event next = null;
+      EventBatch batch = null;
 
       // RenameOp
-      next = waitForNextEvent(eis);
-      Assert.assertTrue(next.getEventType() == Event.EventType.RENAME);
-      Event.RenameEvent re = (Event.RenameEvent) next;
-      Assert.assertTrue(re.getDstPath().equals("/file4"));
-      Assert.assertTrue(re.getSrcPath().equals("/file"));
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      long txid = batch.getTxid();
+      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.RENAME);
+      Event.RenameEvent re = (Event.RenameEvent) batch.getEvents()[0];
+      Assert.assertEquals("/file4", re.getDstPath());
+      Assert.assertEquals("/file", re.getSrcPath());
       Assert.assertTrue(re.getTimestamp() > 0);
 
-      long eventsBehind = eis.getEventsBehindEstimate();
+      long eventsBehind = eis.getTxidsBehindEstimate();
 
       // RenameOldOp
-      next = waitForNextEvent(eis);
-      Assert.assertTrue(next.getEventType() == Event.EventType.RENAME);
-      Event.RenameEvent re2 = (Event.RenameEvent) next;
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      txid = checkTxid(batch, txid);
+      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.RENAME);
+      Event.RenameEvent re2 = (Event.RenameEvent) batch.getEvents()[0];
       Assert.assertTrue(re2.getDstPath().equals("/file2"));
       Assert.assertTrue(re2.getSrcPath().equals("/file4"));
       Assert.assertTrue(re.getTimestamp() > 0);
 
       // AddOp with overwrite
-      next = waitForNextEvent(eis);
-      Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
-      Event.CreateEvent ce = (Event.CreateEvent) next;
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      txid = checkTxid(batch, txid);
+      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
+      Event.CreateEvent ce = (Event.CreateEvent) batch.getEvents()[0];
       Assert.assertTrue(ce.getiNodeType() == Event.CreateEvent.INodeType.FILE);
       Assert.assertTrue(ce.getPath().equals("/file2"));
       Assert.assertTrue(ce.getCtime() > 0);
@@ -159,66 +172,80 @@ public class TestDFSInotifyEventInputStream {
       Assert.assertTrue(ce.getOverwrite());
 
       // CloseOp
-      next = waitForNextEvent(eis);
-      Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE);
-      Event.CloseEvent ce2 = (Event.CloseEvent) next;
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      txid = checkTxid(batch, txid);
+      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CLOSE);
+      Event.CloseEvent ce2 = (Event.CloseEvent) batch.getEvents()[0];
       Assert.assertTrue(ce2.getPath().equals("/file2"));
       Assert.assertTrue(ce2.getFileSize() > 0);
       Assert.assertTrue(ce2.getTimestamp() > 0);
 
       // AddOp
-      next = waitForNextEvent(eis);
-      Assert.assertTrue(next.getEventType() == Event.EventType.APPEND);
-      Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2"));
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      txid = checkTxid(batch, txid);
+      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.APPEND);
+      Assert.assertTrue(((Event.AppendEvent) batch.getEvents()[0]).getPath().equals("/file2"));
 
       // CloseOp
-      next = waitForNextEvent(eis);
-      Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE);
-      Assert.assertTrue(((Event.CloseEvent) next).getPath().equals("/file2"));
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      txid = checkTxid(batch, txid);
+      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CLOSE);
+      Assert.assertTrue(((Event.CloseEvent) batch.getEvents()[0]).getPath().equals("/file2"));
 
       // TimesOp
-      next = waitForNextEvent(eis);
-      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
-      Event.MetadataUpdateEvent mue = (Event.MetadataUpdateEvent) next;
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      txid = checkTxid(batch, txid);
+      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue = (Event.MetadataUpdateEvent) batch.getEvents()[0];
       Assert.assertTrue(mue.getPath().equals("/file2"));
       Assert.assertTrue(mue.getMetadataType() ==
           Event.MetadataUpdateEvent.MetadataType.TIMES);
 
       // SetReplicationOp
-      next = waitForNextEvent(eis);
-      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
-      Event.MetadataUpdateEvent mue2 = (Event.MetadataUpdateEvent) next;
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      txid = checkTxid(batch, txid);
+      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue2 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
       Assert.assertTrue(mue2.getPath().equals("/file2"));
       Assert.assertTrue(mue2.getMetadataType() ==
           Event.MetadataUpdateEvent.MetadataType.REPLICATION);
       Assert.assertTrue(mue2.getReplication() == 1);
 
       // ConcatDeleteOp
-      next = waitForNextEvent(eis);
-      Assert.assertTrue(next.getEventType() == Event.EventType.APPEND);
-      Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2"));
-      next = waitForNextEvent(eis);
-      Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK);
-      Event.UnlinkEvent ue2 = (Event.UnlinkEvent) next;
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(3, batch.getEvents().length);
+      txid = checkTxid(batch, txid);
+      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.APPEND);
+      Assert.assertTrue(((Event.AppendEvent) batch.getEvents()[0]).getPath().equals("/file2"));
+      Assert.assertTrue(batch.getEvents()[1].getEventType() == Event.EventType.UNLINK);
+      Event.UnlinkEvent ue2 = (Event.UnlinkEvent) batch.getEvents()[1];
       Assert.assertTrue(ue2.getPath().equals("/file3"));
       Assert.assertTrue(ue2.getTimestamp() > 0);
-      next = waitForNextEvent(eis);
-      Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE);
-      Event.CloseEvent ce3 = (Event.CloseEvent) next;
+      Assert.assertTrue(batch.getEvents()[2].getEventType() == Event.EventType.CLOSE);
+      Event.CloseEvent ce3 = (Event.CloseEvent) batch.getEvents()[2];
       Assert.assertTrue(ce3.getPath().equals("/file2"));
       Assert.assertTrue(ce3.getTimestamp() > 0);
 
       // DeleteOp
-      next = waitForNextEvent(eis);
-      Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK);
-      Event.UnlinkEvent ue = (Event.UnlinkEvent) next;
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      txid = checkTxid(batch, txid);
+      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.UNLINK);
+      Event.UnlinkEvent ue = (Event.UnlinkEvent) batch.getEvents()[0];
       Assert.assertTrue(ue.getPath().equals("/file2"));
       Assert.assertTrue(ue.getTimestamp() > 0);
 
       // MkdirOp
-      next = waitForNextEvent(eis);
-      Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
-      Event.CreateEvent ce4 = (Event.CreateEvent) next;
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      txid = checkTxid(batch, txid);
+      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
+      Event.CreateEvent ce4 = (Event.CreateEvent) batch.getEvents()[0];
       Assert.assertTrue(ce4.getiNodeType() ==
           Event.CreateEvent.INodeType.DIRECTORY);
       Assert.assertTrue(ce4.getPath().equals("/dir"));
@@ -227,18 +254,22 @@ public class TestDFSInotifyEventInputStream {
       Assert.assertTrue(ce4.getSymlinkTarget() == null);
 
       // SetPermissionsOp
-      next = waitForNextEvent(eis);
-      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
-      Event.MetadataUpdateEvent mue3 = (Event.MetadataUpdateEvent) next;
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      txid = checkTxid(batch, txid);
+      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue3 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
       Assert.assertTrue(mue3.getPath().equals("/dir"));
       Assert.assertTrue(mue3.getMetadataType() ==
           Event.MetadataUpdateEvent.MetadataType.PERMS);
       Assert.assertTrue(mue3.getPerms().toString().contains("rw-rw-rw-"));
 
       // SetOwnerOp
-      next = waitForNextEvent(eis);
-      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
-      Event.MetadataUpdateEvent mue4 = (Event.MetadataUpdateEvent) next;
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      txid = checkTxid(batch, txid);
+      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue4 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
       Assert.assertTrue(mue4.getPath().equals("/dir"));
       Assert.assertTrue(mue4.getMetadataType() ==
           Event.MetadataUpdateEvent.MetadataType.OWNER);
@@ -246,9 +277,11 @@ public class TestDFSInotifyEventInputStream {
       Assert.assertTrue(mue4.getGroupName().equals("groupname"));
 
       // SymlinkOp
-      next = waitForNextEvent(eis);
-      Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
-      Event.CreateEvent ce5 = (Event.CreateEvent) next;
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      txid = checkTxid(batch, txid);
+      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
+      Event.CreateEvent ce5 = (Event.CreateEvent) batch.getEvents()[0];
       Assert.assertTrue(ce5.getiNodeType() ==
           Event.CreateEvent.INodeType.SYMLINK);
       Assert.assertTrue(ce5.getPath().equals("/dir2"));
@@ -257,9 +290,11 @@ public class TestDFSInotifyEventInputStream {
       Assert.assertTrue(ce5.getSymlinkTarget().equals("/dir"));
 
       // SetXAttrOp
-      next = waitForNextEvent(eis);
-      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
-      Event.MetadataUpdateEvent mue5 = (Event.MetadataUpdateEvent) next;
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      txid = checkTxid(batch, txid);
+      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue5 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
       Assert.assertTrue(mue5.getPath().equals("/file5"));
       Assert.assertTrue(mue5.getMetadataType() ==
           Event.MetadataUpdateEvent.MetadataType.XATTRS);
@@ -268,9 +303,11 @@ public class TestDFSInotifyEventInputStream {
       Assert.assertTrue(!mue5.isxAttrsRemoved());
 
       // RemoveXAttrOp
-      next = waitForNextEvent(eis);
-      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
-      Event.MetadataUpdateEvent mue6 = (Event.MetadataUpdateEvent) next;
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      txid = checkTxid(batch, txid);
+      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue6 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
       Assert.assertTrue(mue6.getPath().equals("/file5"));
       Assert.assertTrue(mue6.getMetadataType() ==
           Event.MetadataUpdateEvent.MetadataType.XATTRS);
@@ -279,9 +316,11 @@ public class TestDFSInotifyEventInputStream {
       Assert.assertTrue(mue6.isxAttrsRemoved());
 
       // SetAclOp (1)
-      next = waitForNextEvent(eis);
-      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
-      Event.MetadataUpdateEvent mue7 = (Event.MetadataUpdateEvent) next;
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      txid = checkTxid(batch, txid);
+      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue7 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
       Assert.assertTrue(mue7.getPath().equals("/file5"));
       Assert.assertTrue(mue7.getMetadataType() ==
           Event.MetadataUpdateEvent.MetadataType.ACLS);
@@ -289,9 +328,11 @@ public class TestDFSInotifyEventInputStream {
           AclEntry.parseAclEntry("user::rwx", true)));
 
       // SetAclOp (2)
-      next = waitForNextEvent(eis);
-      Assert.assertTrue(next.getEventType() == Event.EventType.METADATA);
-      Event.MetadataUpdateEvent mue8 = (Event.MetadataUpdateEvent) next;
+      batch = waitForNextEvents(eis);
+      Assert.assertEquals(1, batch.getEvents().length);
+      txid = checkTxid(batch, txid);
+      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA);
+      Event.MetadataUpdateEvent mue8 = (Event.MetadataUpdateEvent) batch.getEvents()[0];
       Assert.assertTrue(mue8.getPath().equals("/file5"));
       Assert.assertTrue(mue8.getMetadataType() ==
           Event.MetadataUpdateEvent.MetadataType.ACLS);
@@ -305,7 +346,7 @@ public class TestDFSInotifyEventInputStream {
       // and we should not have been behind at all when eventsBehind was set
       // either, since there were few enough events that they should have all
       // been read to the client during the first poll() call
-      Assert.assertTrue(eis.getEventsBehindEstimate() == eventsBehind);
+      Assert.assertTrue(eis.getTxidsBehindEstimate() == eventsBehind);
 
     } finally {
       cluster.shutdown();
@@ -329,13 +370,14 @@ public class TestDFSInotifyEventInputStream {
       }
       cluster.getDfsCluster().shutdownNameNode(0);
       cluster.getDfsCluster().transitionToActive(1);
-      Event next = null;
+      EventBatch batch = null;
       // we can read all of the edits logged by the old active from the new
       // active
       for (int i = 0; i < 10; i++) {
-        next = waitForNextEvent(eis);
-        Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
-        Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" +
+        batch = waitForNextEvents(eis);
+        Assert.assertEquals(1, batch.getEvents().length);
+        Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
+        Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
             i));
       }
       Assert.assertTrue(eis.poll() == null);
@@ -369,11 +411,12 @@ public class TestDFSInotifyEventInputStream {
       // make sure that the old active can't read any further than the edits
       // it logged itself (it has no idea whether the in-progress edits from
       // the other writer have actually been committed)
-      Event next = null;
+      EventBatch batch = null;
       for (int i = 0; i < 10; i++) {
-        next = waitForNextEvent(eis);
-        Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
-        Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" +
+        batch = waitForNextEvents(eis);
+        Assert.assertEquals(1, batch.getEvents().length);
+        Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
+        Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" +
             i));
       }
       Assert.assertTrue(eis.poll() == null);
@@ -414,13 +457,13 @@ public class TestDFSInotifyEventInputStream {
       }, 1, TimeUnit.SECONDS);
       // a very generous wait period -- the edit will definitely have been
       // processed by the time this is up
-      Event next = eis.poll(5, TimeUnit.SECONDS);
-      Assert.assertTrue(next != null);
-      Assert.assertTrue(next.getEventType() == Event.EventType.CREATE);
-      Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir"));
+      EventBatch batch = eis.poll(5, TimeUnit.SECONDS);
+      Assert.assertNotNull(batch);
+      Assert.assertEquals(1, batch.getEvents().length);
+      Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE);
+      Assert.assertEquals("/dir", ((Event.CreateEvent) batch.getEvents()[0]).getPath());
     } finally {
       cluster.shutdown();
     }
   }
-
 }


Mime
View raw message