flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject git commit: FLUME-1631: Retire hdfs.txnEventMax in HDFS sink
Date Tue, 16 Oct 2012 19:28:54 GMT
Updated Branches:
  refs/heads/trunk 0b59252e7 -> d4dde03d1


FLUME-1631: Retire hdfs.txnEventMax in HDFS sink

(Mike Percy via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d4dde03d
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d4dde03d
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d4dde03d

Branch: refs/heads/trunk
Commit: d4dde03d1ff9e19796d9385e8f4a3e87311302f7
Parents: 0b59252
Author: Brock Noland <brock@apache.org>
Authored: Tue Oct 16 14:27:40 2012 -0500
Committer: Brock Noland <brock@apache.org>
Committed: Tue Oct 16 14:27:40 2012 -0500

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   11 +--
 .../org/apache/flume/sink/hdfs/HDFSEventSink.java  |   33 +++----
 .../apache/flume/sink/hdfs/TestHDFSEventSink.java  |   71 +++++++--------
 3 files changed, 51 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/d4dde03d/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 953a670..e5f7581 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -984,23 +984,21 @@ Name                    Default       Description
 **channel**             --
 **type**                --            The component type name, needs to be ``hdfs``
 **hdfs.path**           --            HDFS directory path (eg hdfs://namenode/flume/webdata/)
-hdfs.timeZone           Local Time    Name of the timezone that should be used for resolving
the directory path, e.g. America/Los_Angeles.
 hdfs.filePrefix         FlumeData     Name prefixed to files created by Flume in hdfs directory
 hdfs.rollInterval       30            Number of seconds to wait before rolling current file
                                       (0 = never roll based on time interval)
 hdfs.rollSize           1024          File size to trigger roll, in bytes (0: never roll
based on file size)
 hdfs.rollCount          10            Number of events written to file before it rolled
                                       (0 = never roll based on number of events)
-hdfs.batchSize          1             number of events written to file before it flushed
to HDFS
-hdfs.txnEventMax        100
+hdfs.batchSize          100           number of events written to file before it is flushed
to HDFS
 hdfs.codeC              --            Compression codec. one of following : gzip, bzip2,
lzo, snappy
 hdfs.fileType           SequenceFile  File format: currently ``SequenceFile``, ``DataStream``
or ``CompressedStream``
                                       (1)DataStream will not compress output file and please
don't set codeC
                                       (2)CompressedStream requires set hdfs.codeC with an
available codeC
-hdfs.maxOpenFiles       5000
+hdfs.maxOpenFiles       5000          Allow only this number of open files. If this number
is exceeded, the oldest file is closed.
 hdfs.writeFormat        --            "Text" or "Writable"
-hdfs.appendTimeout      1000
-hdfs.callTimeout        10000
+hdfs.callTimeout        10000         Number of milliseconds allowed for HDFS operations,
such as open, write, flush, close.
+                                      This number should be increased if many HDFS timeout
operations are occurring.
 hdfs.threadsPoolSize    10            Number of threads per HDFS sink for HDFS IO ops (open,
write, etc.)
 hdfs.rollTimerPoolSize  1             Number of threads per HDFS sink for scheduling timed
file rolling
 hdfs.kerberosPrincipal  --            Kerberos user principal for accessing secure HDFS
@@ -1008,6 +1006,7 @@ hdfs.kerberosKeytab     --            Kerberos keytab for accessing
secure HDFS
 hdfs.round              false         Should the timestamp be rounded down (if true, affects
all time based escape sequences except %t)
 hdfs.roundValue         1             Rounded down to the highest multiple of this (in the
unit configured using ``hdfs.roundUnit``), less than current time.
 hdfs.roundUnit          second        The unit of the round down value - ``second``, ``minute``
or ``hour``.
+hdfs.timeZone           Local Time    Name of the timezone that should be used for resolving
the directory path, e.g. America/Los_Angeles.
 serializer              ``TEXT``      Other possible options include ``AVRO_EVENT`` or the
                                       fully-qualified class name of an implementation of
the
                                       ``EventSerializer.Builder`` interface.

http://git-wip-us.apache.org/repos/asf/flume/blob/d4dde03d/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index 5ec9eb8..a6d624b 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -69,7 +69,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
   private static final long defaultRollSize = 1024;
   private static final long defaultRollCount = 10;
   private static final String defaultFileName = "FlumeData";
-  private static final long defaultBatchSize = 1;
+  private static final long defaultBatchSize = 100;
   private static final long defaultTxnEventMax = 100;
   private static final String defaultFileType = HDFSWriterFactory.SequenceFileType;
   private static final int defaultMaxOpenFiles = 5000;
@@ -101,7 +101,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
   private long rollInterval;
   private long rollSize;
   private long rollCount;
-  private long txnEventMax;
   private long batchSize;
   private int threadsPoolSize;
   private int rollTimerPoolSize;
@@ -185,7 +184,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
     rollSize = context.getLong("hdfs.rollSize", defaultRollSize);
     rollCount = context.getLong("hdfs.rollCount", defaultRollCount);
     batchSize = context.getLong("hdfs.batchSize", defaultBatchSize);
-    txnEventMax = context.getLong("hdfs.txnEventMax", defaultTxnEventMax);
     String codecName = context.getString("hdfs.codeC");
     fileType = context.getString("hdfs.fileType", defaultFileType);
     maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles);
@@ -201,8 +199,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
 
     Preconditions.checkArgument(batchSize > 0,
         "batchSize must be greater than 0");
-    Preconditions.checkArgument(txnEventMax > 0,
-        "txnEventMax must be greater than 0");
     if (codecName == null) {
       codeC = null;
       compType = CompressionType.NONE;
@@ -368,11 +364,11 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
     }
   }
   /**
-   * Pull events out of channel and send it to HDFS - take at the most
-   * txnEventMax, that's the maximum #events to hold in channel for a given
-   * transaction - find the corresponding bucket for the event, ensure the file
-   * is open - extract the pay-load and append to HDFS file <br />
-   * WARNING: NOT THREAD SAFE
+   * Pull events out of channel and send it to HDFS. Take at most batchSize
+   * events per Transaction. Find the corresponding bucket for the event.
+   * Ensure the file is open. Serialize the data and write it to the file on
+   * HDFS. <br/>
+   * This method is not thread safe.
    */
   @Override
   public Status process() throws EventDeliveryException {
@@ -381,10 +377,9 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
     List<BucketWriter> writers = Lists.newArrayList();
     transaction.begin();
     try {
-      Event event = null;
       int txnEventCount = 0;
-      for (txnEventCount = 0; txnEventCount < txnEventMax; txnEventCount++) {
-        event = channel.take();
+      for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
+        Event event = channel.take();
         if (event == null) {
           break;
         }
@@ -418,7 +413,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
 
       if (txnEventCount == 0) {
         sinkCounter.incrementBatchEmptyCount();
-      } else if (txnEventCount == txnEventMax) {
+      } else if (txnEventCount == batchSize) {
         sinkCounter.incrementBatchCompleteCount();
       } else {
         sinkCounter.incrementBatchUnderflowCount();
@@ -431,14 +426,12 @@ public class HDFSEventSink extends AbstractSink implements Configurable
{
 
       transaction.commit();
 
-      if (txnEventCount > 0) {
-        sinkCounter.addToEventDrainSuccessCount(txnEventCount);
-      }
-
-      if(event == null) {
+      if (txnEventCount < 1) {
         return Status.BACKOFF;
+      } else {
+        sinkCounter.addToEventDrainSuccessCount(txnEventCount);
+        return Status.READY;
       }
-      return Status.READY;
     } catch (IOException eIO) {
       transaction.rollback();
       LOG.warn("HDFS IO error", eIO);

http://git-wip-us.apache.org/repos/asf/flume/blob/d4dde03d/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
index ba30d01..fee4c8b 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
@@ -75,7 +75,9 @@ public class TestHDFSEventSink {
     try {
       FileSystem fs = FileSystem.get(conf);
       Path dirPath = new Path(testPath);
-      fs.delete(dirPath, true);
+      if (fs.exists(dirPath)) {
+        fs.delete(dirPath, true);
+      }
     } catch (IOException eIO) {
       LOG.warn("IO Error in test cleanup", eIO);
     }
@@ -105,13 +107,11 @@ public class TestHDFSEventSink {
       dirCleanup();
   }
 
-
   @Test
   public void testTextBatchAppend() throws InterruptedException, LifecycleException,
       EventDeliveryException, IOException {
     LOG.debug("Starting...");
 
-    final long txnMax = 2;
     final long rollCount = 10;
     final long batchSize = 2;
     final String fileName = "FlumeData";
@@ -131,7 +131,6 @@ public class TestHDFSEventSink {
     // context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
     context.put("hdfs.path", newPath);
     context.put("hdfs.filePrefix", fileName);
-    context.put("hdfs.txnEventMax", String.valueOf(txnMax));
     context.put("hdfs.rollCount", String.valueOf(rollCount));
     context.put("hdfs.rollInterval", "0");
     context.put("hdfs.rollSize", "0");
@@ -151,10 +150,10 @@ public class TestHDFSEventSink {
     List<String> bodies = Lists.newArrayList();
 
     // push the event batches into channel to roll twice
-    for (i = 1; i <= rollCount*2/txnMax; i++) {
+    for (i = 1; i <= rollCount*2/batchSize; i++) {
       Transaction txn = channel.getTransaction();
       txn.begin();
-      for (j = 1; j <= txnMax; j++) {
+      for (j = 1; j <= batchSize; j++) {
         Event event = new SimpleEvent();
         eventDate.clear();
         eventDate.set(2011, i, i, i, 0); // yy mm dd
@@ -178,7 +177,10 @@ public class TestHDFSEventSink {
     Path fList[] = FileUtil.stat2Paths(dirStat);
 
     // check that the roll happened correctly for the given data
-    Assert.assertEquals("num files", totalEvents / rollCount, fList.length);
+    long expectedFiles = totalEvents / rollCount;
+    if (totalEvents % rollCount > 0) expectedFiles++;
+    Assert.assertEquals("num files wrong, found: " +
+        Lists.newArrayList(fList), expectedFiles, fList.length);
     // check the contents of the all files
     verifyOutputTextFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
   }
@@ -222,7 +224,6 @@ public class TestHDFSEventSink {
   public void testKerbFileAccess() throws InterruptedException,
       LifecycleException, EventDeliveryException, IOException {
     LOG.debug("Starting testKerbFileAccess() ...");
-    final long txnMax = 25;
     final String fileName = "FlumeData";
     final long rollCount = 5;
     final long batchSize = 2;
@@ -239,7 +240,6 @@ public class TestHDFSEventSink {
     Context context = new Context();
     context.put("hdfs.path", newPath);
     context.put("hdfs.filePrefix", fileName);
-    context.put("hdfs.txnEventMax", String.valueOf(txnMax));
     context.put("hdfs.rollCount", String.valueOf(rollCount));
     context.put("hdfs.batchSize", String.valueOf(batchSize));
     context.put("hdfs.kerberosPrincipal", kerbConfPrincipal);
@@ -264,7 +264,6 @@ public class TestHDFSEventSink {
       EventDeliveryException, IOException {
 
     LOG.debug("Starting...");
-    final long txnMax = 25;
     final long rollCount = 3;
     final long batchSize = 2;
     final String fileName = "FlumeData";
@@ -284,7 +283,6 @@ public class TestHDFSEventSink {
     // context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
     context.put("hdfs.path", newPath);
     context.put("hdfs.filePrefix", fileName);
-    context.put("hdfs.txnEventMax", String.valueOf(txnMax));
     context.put("hdfs.rollCount", String.valueOf(rollCount));
     context.put("hdfs.batchSize", String.valueOf(batchSize));
     context.put("hdfs.writeFormat", "Text");
@@ -305,7 +303,7 @@ public class TestHDFSEventSink {
     for (i = 1; i < 4; i++) {
       Transaction txn = channel.getTransaction();
       txn.begin();
-      for (j = 1; j <= txnMax; j++) {
+      for (j = 1; j <= batchSize; j++) {
         Event event = new SimpleEvent();
         eventDate.clear();
         eventDate.set(2011, i, i, i, 0); // yy mm dd
@@ -332,7 +330,10 @@ public class TestHDFSEventSink {
     Path fList[] = FileUtil.stat2Paths(dirStat);
 
     // check that the roll happened correctly for the given data
-    Assert.assertEquals("num files", totalEvents / rollCount, fList.length);
+    long expectedFiles = totalEvents / rollCount;
+    if (totalEvents % rollCount > 0) expectedFiles++;
+    Assert.assertEquals("num files wrong, found: " +
+        Lists.newArrayList(fList), expectedFiles, fList.length);
     verifyOutputTextFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
   }
 
@@ -341,7 +342,6 @@ public class TestHDFSEventSink {
       EventDeliveryException, IOException {
 
     LOG.debug("Starting...");
-    final long txnMax = 25;
     final long rollCount = 3;
     final long batchSize = 2;
     final String fileName = "FlumeData";
@@ -361,7 +361,6 @@ public class TestHDFSEventSink {
     // context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
     context.put("hdfs.path", newPath);
     context.put("hdfs.filePrefix", fileName);
-    context.put("hdfs.txnEventMax", String.valueOf(txnMax));
     context.put("hdfs.rollCount", String.valueOf(rollCount));
     context.put("hdfs.batchSize", String.valueOf(batchSize));
     context.put("hdfs.writeFormat", "Text");
@@ -383,7 +382,7 @@ public class TestHDFSEventSink {
     for (i = 1; i < 4; i++) {
       Transaction txn = channel.getTransaction();
       txn.begin();
-      for (j = 1; j <= txnMax; j++) {
+      for (j = 1; j <= batchSize; j++) {
         Event event = new SimpleEvent();
         eventDate.clear();
         eventDate.set(2011, i, i, i, 0); // yy mm dd
@@ -410,7 +409,10 @@ public class TestHDFSEventSink {
     Path fList[] = FileUtil.stat2Paths(dirStat);
 
     // check that the roll happened correctly for the given data
-    Assert.assertEquals("num files", totalEvents / rollCount, fList.length);
+    long expectedFiles = totalEvents / rollCount;
+    if (totalEvents % rollCount > 0) expectedFiles++;
+    Assert.assertEquals("num files wrong, found: " +
+        Lists.newArrayList(fList), expectedFiles, fList.length);
     verifyOutputAvroFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
   }
 
@@ -419,7 +421,6 @@ public class TestHDFSEventSink {
       LifecycleException, EventDeliveryException, IOException {
 
     LOG.debug("Starting...");
-    final long txnMax = 25;
     final String fileName = "FlumeData";
     final long rollCount = 5;
     final long batchSize = 2;
@@ -439,7 +440,6 @@ public class TestHDFSEventSink {
 
     context.put("hdfs.path", newPath);
     context.put("hdfs.filePrefix", fileName);
-    context.put("hdfs.txnEventMax", String.valueOf(txnMax));
     context.put("hdfs.rollCount", String.valueOf(rollCount));
     context.put("hdfs.batchSize", String.valueOf(batchSize));
 
@@ -458,7 +458,7 @@ public class TestHDFSEventSink {
     for (i = 1; i < numBatches; i++) {
       Transaction txn = channel.getTransaction();
       txn.begin();
-      for (j = 1; j <= txnMax; j++) {
+      for (j = 1; j <= batchSize; j++) {
         Event event = new SimpleEvent();
         eventDate.clear();
         eventDate.set(2011, i, i, i, 0); // yy mm dd
@@ -485,7 +485,10 @@ public class TestHDFSEventSink {
     Path fList[] = FileUtil.stat2Paths(dirStat);
 
     // check that the roll happened correctly for the given data
-    Assert.assertEquals("num files", totalEvents / rollCount, fList.length);
+    long expectedFiles = totalEvents / rollCount;
+    if (totalEvents % rollCount > 0) expectedFiles++;
+    Assert.assertEquals("num files wrong, found: " +
+        Lists.newArrayList(fList), expectedFiles, fList.length);
     verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
   }
 
@@ -494,7 +497,6 @@ public class TestHDFSEventSink {
       EventDeliveryException, IOException {
 
     LOG.debug("Starting...");
-    final long txnMax = 25;
     final long rollCount = 3;
     final long batchSize = 2;
     final String fileName = "FlumeData";
@@ -511,7 +513,6 @@ public class TestHDFSEventSink {
     context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
     context.put("hdfs.timeZone", "UTC");
     context.put("hdfs.filePrefix", fileName);
-    context.put("hdfs.txnEventMax", String.valueOf(txnMax));
     context.put("hdfs.rollCount", String.valueOf(rollCount));
     context.put("hdfs.batchSize", String.valueOf(batchSize));
 
@@ -529,7 +530,7 @@ public class TestHDFSEventSink {
     for (int i = 1; i < 4; i++) {
       Transaction txn = channel.getTransaction();
       txn.begin();
-      for (int j = 1; j <= txnMax; j++) {
+      for (int j = 1; j <= batchSize; j++) {
         Event event = new SimpleEvent();
         eventDate.clear();
         eventDate.set(2011, i, i, i, 0); // yy mm dd
@@ -558,7 +559,6 @@ public class TestHDFSEventSink {
       LifecycleException, EventDeliveryException, IOException {
 
     LOG.debug("Starting...");
-    final long txnMax = 25;
     final String fileName = "FlumeData";
     final long rollCount = 5;
     final long batchSize = 2;
@@ -581,7 +581,6 @@ public class TestHDFSEventSink {
 
     context.put("hdfs.path", newPath);
     context.put("hdfs.filePrefix", fileName);
-    context.put("hdfs.txnEventMax", String.valueOf(txnMax));
     context.put("hdfs.rollCount", String.valueOf(rollCount));
     context.put("hdfs.batchSize", String.valueOf(batchSize));
     context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType);
@@ -601,7 +600,7 @@ public class TestHDFSEventSink {
     for (i = 1; i < numBatches; i++) {
       Transaction txn = channel.getTransaction();
       txn.begin();
-      for (j = 1; j <= txnMax; j++) {
+      for (j = 1; j <= batchSize; j++) {
         Event event = new SimpleEvent();
         eventDate.clear();
         eventDate.set(2011, i, i, i, 0); // yy mm dd
@@ -741,7 +740,6 @@ public class TestHDFSEventSink {
 
     LOG.debug("Starting...");
     final int numBatches = 4;
-    final long txnMax = 25;
     final String fileName = "FlumeData";
     final long rollCount = 5;
     final long batchSize = 2;
@@ -762,7 +760,6 @@ public class TestHDFSEventSink {
 
     context.put("hdfs.path", newPath);
     context.put("hdfs.filePrefix", fileName);
-    context.put("hdfs.txnEventMax", String.valueOf(txnMax));
     context.put("hdfs.rollCount", String.valueOf(rollCount));
     context.put("hdfs.batchSize", String.valueOf(batchSize));
     context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType);
@@ -781,7 +778,7 @@ public class TestHDFSEventSink {
     for (i = 1; i < numBatches; i++) {
       channel.getTransaction().begin();
       try {
-        for (j = 1; j <= txnMax; j++) {
+        for (j = 1; j <= batchSize; j++) {
           Event event = new SimpleEvent();
           eventDate.clear();
           eventDate.set(2011, i, i, i, 0); // yy mm dd
@@ -816,7 +813,6 @@ public class TestHDFSEventSink {
       LifecycleException, EventDeliveryException, IOException {
 
     LOG.debug("Starting...");
-    final long txnMax = 2;
     final String fileName = "FlumeData";
     final long rollCount = 5;
     final long batchSize = 2;
@@ -838,7 +834,6 @@ public class TestHDFSEventSink {
     Context context = new Context();
     context.put("hdfs.path", newPath);
     context.put("hdfs.filePrefix", fileName);
-    context.put("hdfs.txnEventMax", String.valueOf(txnMax));
     context.put("hdfs.rollCount", String.valueOf(rollCount));
     context.put("hdfs.batchSize", String.valueOf(batchSize));
     context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType);
@@ -857,7 +852,7 @@ public class TestHDFSEventSink {
     for (i = 0; i < numBatches; i++) {
       Transaction txn = channel.getTransaction();
       txn.begin();
-      for (j = 1; j <= txnMax; j++) {
+      for (j = 1; j <= batchSize; j++) {
         Event event = new SimpleEvent();
         eventDate.clear();
         eventDate.set(2011, i, i, i, 0); // yy mm dd
@@ -887,7 +882,6 @@ public class TestHDFSEventSink {
    */
   private void slowAppendTestHelper (long appendTimeout)  throws InterruptedException, IOException,
   LifecycleException, EventDeliveryException, IOException {
-    final long txnMax = 2;
     final String fileName = "FlumeData";
     final long rollCount = 5;
     final long batchSize = 2;
@@ -910,7 +904,6 @@ public class TestHDFSEventSink {
     Context context = new Context();
     context.put("hdfs.path", newPath);
     context.put("hdfs.filePrefix", fileName);
-    context.put("hdfs.txnEventMax", String.valueOf(txnMax));
     context.put("hdfs.rollCount", String.valueOf(rollCount));
     context.put("hdfs.batchSize", String.valueOf(batchSize));
     context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType);
@@ -929,7 +922,7 @@ public class TestHDFSEventSink {
     for (i = 0; i < numBatches; i++) {
       Transaction txn = channel.getTransaction();
       txn.begin();
-      for (j = 1; j <= txnMax; j++) {
+      for (j = 1; j <= batchSize; j++) {
         Event event = new SimpleEvent();
         eventDate.clear();
         eventDate.set(2011, i, i, i, 0); // yy mm dd
@@ -958,8 +951,10 @@ public class TestHDFSEventSink {
 
     // check that the roll happened correctly for the given data
     // Note that we'll end up with two files with only a head
-    Assert.assertEquals((totalEvents / rollCount) +1 , fList.length);
-
+    long expectedFiles = totalEvents / rollCount;
+    if (totalEvents % rollCount > 0) expectedFiles++;
+    Assert.assertEquals("num files wrong, found: " +
+        Lists.newArrayList(fList), expectedFiles, fList.length);
     verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
   }
 


Mime
View raw message