flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pras...@apache.org
Subject svn commit: r1244133 - /incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
Date Tue, 14 Feb 2012 17:32:01 GMT
Author: prasadm
Date: Tue Feb 14 17:32:01 2012
New Revision: 1244133

URL: http://svn.apache.org/viewvc?rev=1244133&view=rev
Log:
FLUME-963: Add additional tests to TestHDFSEventSink and demystify existing test

Brock Noland (via Prasad Mujumdar)

Modified:
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java

Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java?rev=1244133&r1=1244132&r2=1244133&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
Tue Feb 14 17:32:01 2012
@@ -17,10 +17,13 @@
  */
 package org.apache.flume.sink.hdfs;
 
+import static org.junit.Assert.*;
 import java.io.BufferedReader;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.Calendar;
+import java.util.List;
 
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
@@ -48,6 +51,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 public class TestHDFSEventSink {
 
   private HDFSEventSink sink;
@@ -146,7 +151,8 @@ public class TestHDFSEventSink {
     sink.start();
 
     Calendar eventDate = Calendar.getInstance();
-
+    List<String> bodies = Lists.newArrayList();
+    
     // push the event batches into channel
     for (i = 1; i < 4; i++) {
       Transaction txn = channel.getTransaction();
@@ -158,8 +164,9 @@ public class TestHDFSEventSink {
         event.getHeaders().put("timestamp",
             String.valueOf(eventDate.getTimeInMillis()));
         event.getHeaders().put("hostname", "Host" + i);
-
-        event.setBody(("Test." + i + "." + j).getBytes());
+        String body = "Test." + i + "." + j;
+        event.setBody(body.getBytes());
+        bodies.add(body);
         channel.put(event);
         totalEvents++;
       }
@@ -179,29 +186,7 @@ public class TestHDFSEventSink {
     // check that the roll happened correctly for the given data
     // Note that we'll end up with one last file with only header
     Assert.assertEquals((totalEvents / rollCount) + 1, fList.length);
-
-    try {
-      i = j = 1;
-      for (int cnt = 0; cnt < fList.length - 1; cnt++) {
-        Path filePath = new Path(newPath + "/" + fileName + "." + cnt);
-        FSDataInputStream input = fs.open(filePath);
-        BufferedReader d = new BufferedReader(new InputStreamReader(input));
-        String line;
-
-        while ((line = d.readLine()) != null) {
-          Assert.assertEquals(("Test." + i + "." + j), line);
-          if (++j > txnMax) {
-            j = 1;
-            i++;
-          }
-        }
-        input.close();
-      }
-    } catch (IOException ioe) {
-      System.err.println("IOException during operation: " + ioe.toString());
-      return;
-    }
-    Assert.assertEquals(i, 4);
+    verifyOutputTextFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
   }
 
   @Test
@@ -241,7 +226,8 @@ public class TestHDFSEventSink {
     sink.start();
 
     Calendar eventDate = Calendar.getInstance();
-
+    List<String> bodies = Lists.newArrayList();
+    
     // push the event batches into channel
     for (i = 1; i < numBatches; i++) {
       Transaction txn = channel.getTransaction();
@@ -253,8 +239,9 @@ public class TestHDFSEventSink {
         event.getHeaders().put("timestamp",
             String.valueOf(eventDate.getTimeInMillis()));
         event.getHeaders().put("hostname", "Host" + i);
-
-        event.setBody(("Test." + i + "." + j).getBytes());
+        String body = "Test." + i + "." + j;
+        event.setBody(body.getBytes());
+        bodies.add(body);
         channel.put(event);
         totalEvents++;
       }
@@ -274,32 +261,8 @@ public class TestHDFSEventSink {
     // check that the roll happened correctly for the given data
     // Note that we'll end up with one last file with only header
     Assert.assertEquals((totalEvents / rollCount) + 1, fList.length);
-
-    try {
-      i = j = 1;
-      for (int cnt = 0; cnt < fList.length - 1; cnt++) {
-        Path filePath = new Path(newPath + "/" + fileName + "." + cnt);
-        SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
-        LongWritable key = new LongWritable();
-        BytesWritable value = new BytesWritable();
-        BytesWritable expValue;
-
-        while (reader.next(key, value)) {
-          expValue = new BytesWritable(("Test." + i + "." + j).getBytes());
-          Assert.assertEquals(expValue, value);
-          if (++j > txnMax) {
-            j = 1;
-            i++;
-          }
-        }
-        reader.close();
-      }
-    } catch (IOException ioe) {
-      System.err.println("IOException during operation: " + ioe.toString());
-      System.exit(1);
-    }
-    Assert.assertEquals(i, 4);
-
+    
+    verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
   }
 
   @Test
@@ -335,7 +298,7 @@ public class TestHDFSEventSink {
     sink.start();
 
     Calendar eventDate = Calendar.getInstance();
-
+    List<String> bodies = Lists.newArrayList();
     // push the event batches into channel
     for (int i = 1; i < 4; i++) {
       Transaction txn = channel.getTransaction();
@@ -347,8 +310,9 @@ public class TestHDFSEventSink {
         event.getHeaders().put("timestamp",
             String.valueOf(eventDate.getTimeInMillis()));
         event.getHeaders().put("hostname", "Host" + i);
-
-        event.setBody(("Test." + i + "." + j).getBytes());
+        String body = "Test." + i + "." + j;
+        event.setBody(body.getBytes());
+        bodies.add(body);
         channel.put(event);
       }
       txn.commit();
@@ -359,22 +323,7 @@ public class TestHDFSEventSink {
     }
 
     sink.stop();
-
-    /*
-     * 
-     * // loop through all the files generated and check their contains
-     * FileStatus[] dirStat = fs.listStatus(dirPath); Path fList[] =
-     * FileUtil.stat2Paths(dirStat);
-     * 
-     * try { for (int cnt = 0; cnt < fList.length; cnt++) { SequenceFile.Reader
-     * reader = new SequenceFile.Reader(fs, fList[cnt], conf); LongWritable key
-     * = new LongWritable(); BytesWritable value = new BytesWritable();
-     * 
-     * while (reader.next(key, value)) { logger.info(key+ ":" +
-     * value.toString()); } reader.close(); } } catch (IOException ioe) {
-     * System.err.println("IOException during operation: " + ioe.toString());
-     * System.exit(1); }
-     */
+    verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
   }
 
   // inject fault and make sure that the txn is rolled back and retried
@@ -420,6 +369,7 @@ public class TestHDFSEventSink {
 
     Calendar eventDate = Calendar.getInstance();
 
+    List<String> bodies = Lists.newArrayList();
     // push the event batches into channel
     for (i = 1; i < numBatches; i++) {
       Transaction txn = channel.getTransaction();
@@ -432,7 +382,9 @@ public class TestHDFSEventSink {
             String.valueOf(eventDate.getTimeInMillis()));
         event.getHeaders().put("hostname", "Host" + i);
 
-        event.setBody(("Test." + i + "." + j).getBytes());
+        String body = "Test." + i + "." + j;
+        event.setBody(body.getBytes());
+        bodies.add(body);
         // inject fault
         if ((totalEvents % 30) == 1) {
           event.getHeaders().put("fault-once", "");
@@ -442,15 +394,75 @@ public class TestHDFSEventSink {
       }
       txn.commit();
       txn.close();
-
-      // execute sink to process the events
-      sink.process();
+      
+      LOG.info("Process events: " + sink.process());
     }
-    LOG.info("clear any events pending due to errors");
-    // clear any events pending due to errors
-    sink.process();
-
+    LOG.info("Process events to end of transaction max: " + sink.process());
+    LOG.info("Process events to injected fault: " + sink.process());
+    LOG.info("Process events remaining events: " + sink.process());
     sink.stop();
+    verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies);
+    
+  }
+  
+  
+  private List<String> getAllFiles(String input) {
+    List<String> output = Lists.newArrayList();
+    File dir = new File(input);
+    if(dir.isFile()) {
+      output.add(dir.getAbsolutePath());
+    } else if(dir.isDirectory()) {
+      for(String file : dir.list()) {
+        File subDir = new File(dir, file);
+        output.addAll(getAllFiles(subDir.getAbsolutePath()));
+      }
+    }
+    return output;
+  }
+  
+  private void verifyOutputSequenceFiles(FileSystem fs, Configuration conf, String dir, String
prefix, List<String> bodies) throws IOException {
+    int found = 0;
+    int expected = bodies.size();
+    for(String outputFile : getAllFiles(dir)) {
+      String name = (new File(outputFile)).getName();
+      if(name.startsWith(prefix)) {
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(outputFile), conf);
+        LongWritable key = new LongWritable();
+        BytesWritable value = new BytesWritable();
+        while(reader.next(key, value)) {
+          String body = new String(value.getBytes(), 0, value.getLength());
+          bodies.remove(body);
+          found++;
+        }
+        reader.close();
+      }
+    }
+    assertTrue("Found = " + found + ", Expected = "  +
+        expected + ", Left = " + bodies.size() + " " + bodies, 
+          bodies.size() == 0);
+
+  }
+  
+  private void verifyOutputTextFiles(FileSystem fs, Configuration conf, String dir, String
prefix, List<String> bodies) throws IOException {
+    int found = 0;
+    int expected = bodies.size();
+    for(String outputFile : getAllFiles(dir)) {
+      String name = (new File(outputFile)).getName();
+      if(name.startsWith(prefix)) {
+        FSDataInputStream input = fs.open(new Path(outputFile));
+        BufferedReader reader = new BufferedReader(new InputStreamReader(input));
+        String body = null;
+        while((body = reader.readLine()) != null) {
+          bodies.remove(body);
+          found++;
+        }
+        reader.close();
+      }
+    }
+    assertTrue("Found = " + found + ", Expected = "  +
+        expected + ", Left = " + bodies.size() + " " + bodies, 
+          bodies.size() == 0);
+
   }
 
   /* 
@@ -529,7 +541,7 @@ public class TestHDFSEventSink {
    * append using slow sink writer with specified append timeout
    * verify that the data is written correctly to files
    */  
-  private void slowAppendTestHelper (long appendTimeout)  throws InterruptedException,
+  private void slowAppendTestHelper (long appendTimeout)  throws InterruptedException, IOException,
   LifecycleException, EventDeliveryException, IOException {
     final long txnMax = 2;
     final String fileName = "FlumeData";
@@ -602,28 +614,23 @@ public class TestHDFSEventSink {
     // Note that we'll end up with one last file with only header
     Assert.assertEquals((totalEvents / rollCount) + 1, fList.length);
 
-    try {
-      i = j = 1;
-      for (int cnt = 0; cnt < fList.length - 1; cnt++) {
-        Path filePath = new Path(newPath + "/" + fileName + "." + cnt);
-        SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
-        LongWritable key = new LongWritable();
-        BytesWritable value = new BytesWritable();
-        BytesWritable expValue;
-
-        while (reader.next(key, value)) {
-          expValue = new BytesWritable(("Test." + i + "." + j).getBytes());
-          Assert.assertEquals(expValue, value);
-          if (++j > txnMax) {
-            j = 1;
-            i++;
-          }
+    i = j = 1;
+    for (int cnt = 0; cnt < fList.length - 1; cnt++) {
+      Path filePath = new Path(newPath + "/" + fileName + "." + cnt);
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
+      LongWritable key = new LongWritable();
+      BytesWritable value = new BytesWritable();
+      BytesWritable expValue;
+
+      while (reader.next(key, value)) {
+        expValue = new BytesWritable(("Test." + i + "." + j).getBytes());
+        Assert.assertEquals(expValue, value);
+        if (++j > txnMax) {
+          j = 1;
+          i++;
         }
-        reader.close();
       }
-    } catch (IOException ioe) {
-      System.err.println("IOException during operation: " + ioe.toString());
-      System.exit(1);
+      reader.close();
     }
     Assert.assertEquals(1, i);
   }



Mime
View raw message