flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject [3/3] FLUME-1487. FileChannel format needs to be extensible.
Date Fri, 07 Sep 2012 21:52:59 GMT
http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
index d09ddde..0173390 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
@@ -18,61 +18,152 @@
  */
 package org.apache.flume.channel.file;
 
-import com.google.common.collect.SetMultimap;
 import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
+import com.google.common.collect.SetMultimap;
 import com.google.common.collect.Sets;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
+import com.google.common.io.Files;
 
+@RunWith(value = Parameterized.class)
 public class TestFlumeEventQueue {
-
-  File file;
-  File inflightTakes;
-  File inflightPuts;
   FlumeEventPointer pointer1 = new FlumeEventPointer(1, 1);
   FlumeEventPointer pointer2 = new FlumeEventPointer(2, 2);
   FlumeEventQueue queue;
+  EventQueueBackingStoreSupplier backingStoreSupplier;
+  EventQueueBackingStore backingStore;
+
+  private abstract static class EventQueueBackingStoreSupplier {
+    File baseDir;
+    File checkpoint;
+    File inflightTakes;
+    File inflightPuts;
+    EventQueueBackingStoreSupplier() {
+      baseDir = Files.createTempDir();
+      checkpoint = new File(baseDir, "checkpoint");
+      inflightTakes = new File(baseDir, "inflightputs");
+      inflightPuts =  new File(baseDir, "inflighttakes");
+    }
+    File getCheckpoint() {
+      return checkpoint;
+    }
+    File getInflightPuts() {
+      return inflightPuts;
+    }
+    File getInflightTakes() {
+      return inflightTakes;
+    }
+    void delete() {
+      FileUtils.deleteQuietly(baseDir);
+    }
+    abstract EventQueueBackingStore get() throws Exception ;
+  }
+
+  @Parameters
+  public static Collection<Object[]> data() throws IOException {
+    Object[][] data = new Object[][] { {
+      new EventQueueBackingStoreSupplier() {
+        @Override
+        public EventQueueBackingStore get() throws IOException {
+          Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
+          return new EventQueueBackingStoreFileV2(getCheckpoint(), 1000,
+              "test");
+        }
+      }
+    }, {
+      new EventQueueBackingStoreSupplier() {
+        @Override
+        public EventQueueBackingStore get() throws IOException {
+          Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
+          return new EventQueueBackingStoreFileV3(getCheckpoint(), 1000,
+              "test");
+        }
+      }
+    } };
+    return Arrays.asList(data);
+  }
+
+  public TestFlumeEventQueue(EventQueueBackingStoreSupplier backingStoreSupplier) {
+    this.backingStoreSupplier = backingStoreSupplier;
+  }
   @Before
   public void setup() throws Exception {
-    file = File.createTempFile("Checkpoint", "");
-    inflightTakes = File.createTempFile("inflighttakes", "");
-    inflightPuts = File.createTempFile("inflightputs", "");
+    backingStore = backingStoreSupplier.get();
   }
-  @Test
-  public void testQueueIsEmptyAfterCreation() throws Exception {
-    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
-    Assert.assertNull(queue.removeHead(0));
+  @After
+  public void cleanup() throws IOException {
+    if(backingStore != null) {
+      backingStore.close();
+    }
+    backingStoreSupplier.delete();
   }
   @Test
   public void testCapacity() throws Exception {
-    queue = new FlumeEventQueue(1, file, inflightTakes, inflightPuts,"test");
+    backingStore.close();
+    File checkpoint = backingStoreSupplier.getCheckpoint();
+    Assert.assertTrue(checkpoint.delete());
+    backingStore = new EventQueueBackingStoreFileV2(checkpoint, 1, "test");
+    queue = new FlumeEventQueue(backingStore,
+        backingStoreSupplier.getInflightTakes(),
+        backingStoreSupplier.getInflightPuts());
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertFalse(queue.addTail(pointer2));
   }
   @Test(expected=IllegalArgumentException.class)
   public void testInvalidCapacityZero() throws Exception {
-    queue = new FlumeEventQueue(0, file, inflightTakes, inflightPuts,"test");
+    backingStore.close();
+    File checkpoint = backingStoreSupplier.getCheckpoint();
+    Assert.assertTrue(checkpoint.delete());
+    backingStore = new EventQueueBackingStoreFileV2(checkpoint, 0, "test");
+    queue = new FlumeEventQueue(backingStore,
+        backingStoreSupplier.getInflightTakes(),
+        backingStoreSupplier.getInflightPuts());
   }
   @Test(expected=IllegalArgumentException.class)
   public void testInvalidCapacityNegative() throws Exception {
-    queue = new FlumeEventQueue(-1, file, inflightTakes, inflightPuts,"test");
+    backingStore.close();
+    File checkpoint = backingStoreSupplier.getCheckpoint();
+    Assert.assertTrue(checkpoint.delete());
+    backingStore = new EventQueueBackingStoreFileV2(checkpoint, -1, "test");
+    queue = new FlumeEventQueue(backingStore,
+        backingStoreSupplier.getInflightTakes(),
+        backingStoreSupplier.getInflightPuts());
+  }
+  @Test
+  public void testQueueIsEmptyAfterCreation() throws Exception {
+    queue = new FlumeEventQueue(backingStore,
+        backingStoreSupplier.getInflightTakes(),
+        backingStoreSupplier.getInflightPuts());
+    Assert.assertNull(queue.removeHead(0L));
   }
   @Test
   public void addTail1() throws Exception {
-    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
+    queue = new FlumeEventQueue(backingStore,
+        backingStoreSupplier.getInflightTakes(),
+        backingStoreSupplier.getInflightPuts());
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertEquals(pointer1, queue.removeHead(0));
     Assert.assertEquals(Sets.newHashSet(), queue.getFileIDs());
   }
   @Test
   public void addTail2() throws Exception {
-    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
+    queue = new FlumeEventQueue(backingStore,
+        backingStoreSupplier.getInflightTakes(),
+        backingStoreSupplier.getInflightPuts());
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertTrue(queue.addTail(pointer2));
     Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs());
@@ -81,7 +172,9 @@ public class TestFlumeEventQueue {
   }
   @Test
   public void addTailLarge() throws Exception {
-    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
+    queue = new FlumeEventQueue(backingStore,
+        backingStoreSupplier.getInflightTakes(),
+        backingStoreSupplier.getInflightPuts());
     int size = 500;
     Set<Integer> fileIDs = Sets.newHashSet();
     for (int i = 1; i <= size; i++) {
@@ -98,7 +191,9 @@ public class TestFlumeEventQueue {
   }
   @Test
   public void addHead1() throws Exception {
-    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
+    queue = new FlumeEventQueue(backingStore,
+        backingStoreSupplier.getInflightTakes(),
+        backingStoreSupplier.getInflightPuts());
     Assert.assertTrue(queue.addHead(pointer1));
     Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
     Assert.assertEquals(pointer1, queue.removeHead(0));
@@ -106,7 +201,9 @@ public class TestFlumeEventQueue {
   }
   @Test
   public void addHead2() throws Exception {
-    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
+    queue = new FlumeEventQueue(backingStore,
+        backingStoreSupplier.getInflightTakes(),
+        backingStoreSupplier.getInflightPuts());
     Assert.assertTrue(queue.addHead(pointer1));
     Assert.assertTrue(queue.addHead(pointer2));
     Assert.assertEquals(Sets.newHashSet(1, 2), queue.getFileIDs());
@@ -115,7 +212,9 @@ public class TestFlumeEventQueue {
   }
   @Test
   public void addHeadLarge() throws Exception {
-    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
+    queue = new FlumeEventQueue(backingStore,
+        backingStoreSupplier.getInflightTakes(),
+        backingStoreSupplier.getInflightPuts());
     int size = 500;
     Set<Integer> fileIDs = Sets.newHashSet();
     for (int i = 1; i <= size; i++) {
@@ -132,7 +231,9 @@ public class TestFlumeEventQueue {
   }
   @Test
   public void addTailRemove1() throws Exception {
-    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
+    queue = new FlumeEventQueue(backingStore,
+        backingStoreSupplier.getInflightTakes(),
+        backingStoreSupplier.getInflightPuts());
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertEquals(Sets.newHashSet(1), queue.getFileIDs());
     Assert.assertTrue(queue.remove(pointer1));
@@ -143,7 +244,9 @@ public class TestFlumeEventQueue {
 
   @Test
   public void addTailRemove2() throws Exception {
-    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
+    queue = new FlumeEventQueue(backingStore,
+        backingStoreSupplier.getInflightTakes(),
+        backingStoreSupplier.getInflightPuts());
     Assert.assertTrue(queue.addTail(pointer1));
     Assert.assertTrue(queue.addTail(pointer2));
     Assert.assertTrue(queue.remove(pointer1));
@@ -152,14 +255,18 @@ public class TestFlumeEventQueue {
 
   @Test
   public void addHeadRemove1() throws Exception {
-    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
+    queue = new FlumeEventQueue(backingStore,
+        backingStoreSupplier.getInflightTakes(),
+        backingStoreSupplier.getInflightPuts());
     queue.addHead(pointer1);
     Assert.assertTrue(queue.remove(pointer1));
     Assert.assertNull(queue.removeHead(0));
   }
   @Test
   public void addHeadRemove2() throws Exception {
-    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
+    queue = new FlumeEventQueue(backingStore,
+        backingStoreSupplier.getInflightTakes(),
+        backingStoreSupplier.getInflightPuts());
     Assert.assertTrue(queue.addHead(pointer1));
     Assert.assertTrue(queue.addHead(pointer2));
     Assert.assertTrue(queue.remove(pointer1));
@@ -167,7 +274,9 @@ public class TestFlumeEventQueue {
   }
   @Test
   public void testWrappingCorrectly() throws Exception {
-    queue = new FlumeEventQueue(1000, file, inflightTakes, inflightPuts,"test");
+    queue = new FlumeEventQueue(backingStore,
+        backingStoreSupplier.getInflightTakes(),
+        backingStoreSupplier.getInflightPuts());
     int size = Integer.MAX_VALUE;
     for (int i = 1; i <= size; i++) {
       if(!queue.addHead(new FlumeEventPointer(i, i))) {
@@ -187,7 +296,9 @@ public class TestFlumeEventQueue {
   }
   @Test
   public void testInflightPuts() throws Exception{
-    queue = new FlumeEventQueue(10, file, inflightTakes, inflightPuts, "test");
+    queue = new FlumeEventQueue(backingStore,
+        backingStoreSupplier.getInflightTakes(),
+        backingStoreSupplier.getInflightPuts());
     long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1);
     long txnID2 = txnID1 + 1;
     queue.addWithoutCommit(new FlumeEventPointer(1, 1), txnID1);
@@ -195,7 +306,9 @@ public class TestFlumeEventQueue {
     queue.addWithoutCommit(new FlumeEventPointer(2, 2), txnID2);
     queue.checkpoint(true);
     TimeUnit.SECONDS.sleep(3L);
-    queue = new FlumeEventQueue(10, file, inflightTakes, inflightPuts, "test");
+    queue = new FlumeEventQueue(backingStore,
+        backingStoreSupplier.getInflightTakes(),
+        backingStoreSupplier.getInflightPuts());
     SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightPuts();
     Assert.assertTrue(deserializedMap.get(
             txnID1).contains(new FlumeEventPointer(1, 1).toLong()));
@@ -207,7 +320,9 @@ public class TestFlumeEventQueue {
 
   @Test
   public void testInflightTakes() throws Exception {
-    queue = new FlumeEventQueue(10, file, inflightTakes, inflightPuts, "test");
+    queue = new FlumeEventQueue(backingStore,
+        backingStoreSupplier.getInflightTakes(),
+        backingStoreSupplier.getInflightPuts());
     long txnID1 = new Random().nextInt(Integer.MAX_VALUE - 1);
     long txnID2 = txnID1 + 1;
     queue.addTail(new FlumeEventPointer(1, 1));
@@ -218,8 +333,10 @@ public class TestFlumeEventQueue {
     queue.removeHead(txnID2);
     queue.checkpoint(true);
     TimeUnit.SECONDS.sleep(3L);
-    queue = new FlumeEventQueue(10, file, inflightTakes, inflightPuts, "test");
-        SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightTakes();
+    queue = new FlumeEventQueue(backingStore,
+        backingStoreSupplier.getInflightTakes(),
+        backingStoreSupplier.getInflightPuts());
+    SetMultimap<Long, Long> deserializedMap = queue.deserializeInflightTakes();
     Assert.assertTrue(deserializedMap.get(
             txnID1).contains(new FlumeEventPointer(1, 1).toLong()));
     Assert.assertTrue(deserializedMap.get(

http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
index e923a30..e3eb184 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
@@ -100,8 +100,8 @@ public class TestLog {
         }
       }
     }
-    // 67 files with TestLog.MAX_FILE_SIZE=1000
-    Assert.assertEquals(78, logCount);
+    // 78 (*2 for meta) files with TestLog.MAX_FILE_SIZE=1000
+    Assert.assertEquals(156, logCount);
   }
   /**
    * After replay of the log, we should find the event because the put

http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
index 193cd2b..11d0be0 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
@@ -21,7 +21,9 @@ package org.apache.flume.channel.file;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -34,7 +36,6 @@ import org.junit.Before;
 import org.junit.Test;
 
 import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
 
@@ -51,7 +52,7 @@ public class TestLogFile {
     dataDir = Files.createTempDir();
     dataFile = new File(dataDir, String.valueOf(fileID));
     Assert.assertTrue(dataDir.isDirectory());
-    logFileWriter = new LogFile.Writer(dataFile, fileID, 1000);
+    logFileWriter = LogFileFactory.getWriter(dataFile, fileID, 1000);
   }
   @After
   public void cleanup() throws IOException {
@@ -63,20 +64,25 @@ public class TestLogFile {
   }
   @Test
   public void testPutGet() throws InterruptedException, IOException {
-    final List<Throwable> errors = Lists.newArrayList();
+    final List<Throwable> errors =
+        Collections.synchronizedList(new ArrayList<Throwable>());
     ExecutorService executorService = Executors.newFixedThreadPool(10);
-    final LogFile.RandomReader logFileReader = new LogFile.RandomReader(dataFile);
+    final LogFile.RandomReader logFileReader =
+        LogFileFactory.getRandomReader(dataFile);
     for (int i = 0; i < 1000; i++) {
       // first try and throw failures
-      for(Throwable throwable : errors) {
-        Throwables.propagateIfInstanceOf(throwable, AssertionError.class);
-      }
-      // then throw errors
-      for(Throwable throwable : errors) {
-        Throwables.propagate(throwable);
+      synchronized (errors) {
+        for(Throwable throwable : errors) {
+          Throwables.propagateIfInstanceOf(throwable, AssertionError.class);
+        }
+        // then throw errors
+        for(Throwable throwable : errors) {
+          Throwables.propagate(throwable);
+        }
       }
       final FlumeEvent eventIn = TestUtils.newPersistableEvent();
-      final Put put = new Put(++transactionID, eventIn);
+      final Put put = new Put(++transactionID, WriteOrderOracle.next(),
+          eventIn);
       ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
       FlumeEventPointer ptr = logFileWriter.put(bytes);
       final int offset = ptr.getOffset();
@@ -110,12 +116,14 @@ public class TestLogFile {
     Map<Integer, Put> puts = Maps.newHashMap();
     for (int i = 0; i < 1000; i++) {
       FlumeEvent eventIn = TestUtils.newPersistableEvent();
-      Put put = new Put(++transactionID, eventIn);
+      Put put = new Put(++transactionID, WriteOrderOracle.next(),
+          eventIn);
       ByteBuffer bytes = TransactionEventRecord.toByteBuffer(put);
       FlumeEventPointer ptr = logFileWriter.put(bytes);
       puts.put(ptr.getOffset(), put);
     }
-    LogFile.SequentialReader reader = new LogFile.SequentialReader(dataFile);
+    LogFile.SequentialReader reader =
+        LogFileFactory.getSequentialReader(dataFile);
     LogRecord entry;
     while((entry = reader.next()) != null) {
       Integer offset = entry.getOffset();

http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogRecord.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogRecord.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogRecord.java
index 9f6adc7..04b6ea9 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogRecord.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogRecord.java
@@ -33,8 +33,10 @@ public class TestLogRecord {
   @Test
   public void testConstructor() {
     long now = System.currentTimeMillis();
-    Commit commit = new Commit(now);
+    Commit commit = new Commit(now, now + 1);
     LogRecord logRecord = new LogRecord(1, 2, commit);
+    Assert.assertTrue(now == commit.getTransactionID());
+    Assert.assertTrue(now + 1 == commit.getLogWriteOrderID());
     Assert.assertTrue(1 == logRecord.getFileID());
     Assert.assertTrue(2 == logRecord.getOffset());
     Assert.assertTrue(commit == logRecord.getEvent());
@@ -46,8 +48,7 @@ public class TestLogRecord {
     long now = System.currentTimeMillis();
     List<LogRecord> records = Lists.newArrayList();
     for (int i = 0; i < 3; i++) {
-      Commit commit = new Commit((long)i);
-      commit.setLogWriteOrderID(now - i);
+      Commit commit = new Commit((long)i, now - i);
       LogRecord logRecord = new LogRecord(1, i, commit);
       records.add(logRecord);
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecord.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecord.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecord.java
deleted file mode 100644
index c73b11b..0000000
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecord.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.channel.file;
-
-import static org.mockito.Mockito.*;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashMap;
-
-import junit.framework.Assert;
-
-import org.junit.Test;
-
-public class TestTransactionEventRecord {
-
-  @Test
-  public void testTypes() throws IOException {
-    Put put = new Put(System.currentTimeMillis());
-    Assert.assertEquals(TransactionEventRecord.Type.PUT.get(), put.getRecordType());
-
-    Take take = new Take(System.currentTimeMillis());
-    Assert.assertEquals(TransactionEventRecord.Type.TAKE.get(), take.getRecordType());
-
-    Rollback rollback = new Rollback(System.currentTimeMillis());
-    Assert.assertEquals(TransactionEventRecord.Type.ROLLBACK.get(), rollback.getRecordType());
-
-    Commit commit = new Commit(System.currentTimeMillis());
-    Assert.assertEquals(TransactionEventRecord.Type.COMMIT.get(), commit.getRecordType());
-  }
-
-  @Test
-  public void testPutSerialization() throws IOException {
-    Put in = new Put(System.currentTimeMillis(),
-        new FlumeEvent(new HashMap<String, String>(), new byte[0]));
-    in.setLogWriteOrderID(System.currentTimeMillis());
-    Put out = (Put)TransactionEventRecord.fromDataInput(toDataInput(in));
-    Assert.assertEquals(in.getClass(), out.getClass());
-    Assert.assertEquals(in.getRecordType(), out.getRecordType());
-    Assert.assertEquals(in.getTransactionID(), out.getTransactionID());
-    Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID());
-    Assert.assertEquals(in.getEvent().getHeaders(), out.getEvent().getHeaders());
-    Assert.assertTrue(Arrays.equals(in.getEvent().getBody(), out.getEvent().getBody()));
-  }
-  @Test
-  public void testTakeSerialization() throws IOException {
-    Take in = new Take(System.currentTimeMillis(), 10, 20);
-    in.setLogWriteOrderID(System.currentTimeMillis());
-    Take out = (Take)TransactionEventRecord.fromDataInput(toDataInput(in));
-    Assert.assertEquals(in.getClass(), out.getClass());
-    Assert.assertEquals(in.getRecordType(), out.getRecordType());
-    Assert.assertEquals(in.getTransactionID(), out.getTransactionID());
-    Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID());
-    Assert.assertEquals(in.getFileID(), out.getFileID());
-    Assert.assertEquals(in.getOffset(), out.getOffset());
-  }
-
-  @Test
-  public void testRollbackSerialization() throws IOException {
-    Rollback in = new Rollback(System.currentTimeMillis());
-    in.setLogWriteOrderID(System.currentTimeMillis());
-    Rollback out = (Rollback)TransactionEventRecord.fromDataInput(toDataInput(in));
-    Assert.assertEquals(in.getClass(), out.getClass());
-    Assert.assertEquals(in.getRecordType(), out.getRecordType());
-    Assert.assertEquals(in.getTransactionID(), out.getTransactionID());
-    Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID());
-  }
-
-  @Test
-  public void testCommitSerialization() throws IOException {
-    Commit in = new Commit(System.currentTimeMillis());
-    in.setLogWriteOrderID(System.currentTimeMillis());
-    Commit out = (Commit)TransactionEventRecord.fromDataInput(toDataInput(in));
-    Assert.assertEquals(in.getClass(), out.getClass());
-    Assert.assertEquals(in.getRecordType(), out.getRecordType());
-    Assert.assertEquals(in.getTransactionID(), out.getTransactionID());
-    Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID());
-  }
-
-  @Test
-  public void testBadHeader() throws IOException {
-    Put in = new Put(System.currentTimeMillis(),
-        new FlumeEvent(new HashMap<String, String>(), new byte[0]));
-    in.setLogWriteOrderID(System.currentTimeMillis());
-    try {
-      TransactionEventRecord.fromDataInput(toDataInput(0, in));
-      Assert.fail();
-    } catch (IOException e) {
-      Assert.assertEquals("Header 0 is not the required value: deadbeef",
-          e.getMessage());
-    }
-  }
-
-  @Test
-  public void testBadType() throws IOException {
-    TransactionEventRecord in = mock(TransactionEventRecord.class);
-    when(in.getRecordType()).thenReturn(Short.MIN_VALUE);
-    try {
-      TransactionEventRecord.fromDataInput(toDataInput(in));
-      Assert.fail();
-    } catch(NullPointerException e) {
-      Assert.assertEquals("Unknown action ffff8000", e.getMessage());
-    }
-  }
-
-  private DataInput toDataInput(TransactionEventRecord record) throws IOException {
-    ByteBuffer buffer = TransactionEventRecord.toByteBuffer(record);
-    ByteArrayInputStream byteInput = new ByteArrayInputStream(buffer.array());
-    DataInputStream dataInput = new DataInputStream(byteInput);
-    return dataInput;
-  }
-  private DataInput toDataInput(int header, TransactionEventRecord record) throws IOException
{
-    ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
-    DataOutputStream dataOutput = new DataOutputStream(byteOutput);
-    dataOutput.writeInt(header);
-    dataOutput.writeShort(record.getRecordType());
-    dataOutput.writeLong(record.getTransactionID());
-    record.write(dataOutput);
-    ByteArrayInputStream byteInput = new ByteArrayInputStream(byteOutput.toByteArray());
-    DataInputStream dataInput = new DataInputStream(byteInput);
-    return dataInput;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java
new file mode 100644
index 0000000..2356d90
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV2.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import static org.mockito.Mockito.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+@SuppressWarnings("deprecation")
+public class TestTransactionEventRecordV2 {
+
+  @Test
+  public void testTypes() throws IOException {
+    Put put = new Put(System.currentTimeMillis(), WriteOrderOracle.next());
+    Assert.assertEquals(TransactionEventRecord.Type.PUT.get(),
+        put.getRecordType());
+
+    Take take = new Take(System.currentTimeMillis(), WriteOrderOracle.next());
+    Assert.assertEquals(TransactionEventRecord.Type.TAKE.get(),
+        take.getRecordType());
+
+    Rollback rollback = new Rollback(System.currentTimeMillis(),
+        WriteOrderOracle.next());
+    Assert.assertEquals(TransactionEventRecord.Type.ROLLBACK.get(),
+        rollback.getRecordType());
+
+    Commit commit = new Commit(System.currentTimeMillis(),
+        WriteOrderOracle.next());
+    Assert.assertEquals(TransactionEventRecord.Type.COMMIT.get(),
+        commit.getRecordType());
+  }
+
+  @Test
+  public void testPutSerialization() throws IOException {
+    Put in = new Put(System.currentTimeMillis(),
+        WriteOrderOracle.next(),
+        new FlumeEvent(new HashMap<String, String>(), new byte[0]));
+    Put out = (Put)TransactionEventRecord.fromDataInputV2(toDataInput(in));
+    Assert.assertEquals(in.getClass(), out.getClass());
+    Assert.assertEquals(in.getRecordType(), out.getRecordType());
+    Assert.assertEquals(in.getTransactionID(), out.getTransactionID());
+    Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID());
+    Assert.assertEquals(in.getEvent().getHeaders(), out.getEvent().getHeaders());
+    Assert.assertTrue(Arrays.equals(in.getEvent().getBody(), out.getEvent().getBody()));
+  }
+  @Test
+  public void testTakeSerialization() throws IOException {
+    Take in = new Take(System.currentTimeMillis(),
+        WriteOrderOracle.next(), 10, 20);
+    Take out = (Take)TransactionEventRecord.fromDataInputV2(toDataInput(in));
+    Assert.assertEquals(in.getClass(), out.getClass());
+    Assert.assertEquals(in.getRecordType(), out.getRecordType());
+    Assert.assertEquals(in.getTransactionID(), out.getTransactionID());
+    Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID());
+    Assert.assertEquals(in.getFileID(), out.getFileID());
+    Assert.assertEquals(in.getOffset(), out.getOffset());
+  }
+
+  @Test
+  public void testRollbackSerialization() throws IOException {
+    Rollback in = new Rollback(System.currentTimeMillis(),
+        WriteOrderOracle.next());
+    Rollback out = (Rollback)TransactionEventRecord.fromDataInputV2(toDataInput(in));
+    Assert.assertEquals(in.getClass(), out.getClass());
+    Assert.assertEquals(in.getRecordType(), out.getRecordType());
+    Assert.assertEquals(in.getTransactionID(), out.getTransactionID());
+    Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID());
+  }
+
+  @Test
+  public void testCommitSerialization() throws IOException {
+    Commit in = new Commit(System.currentTimeMillis(),
+        WriteOrderOracle.next());
+    Commit out = (Commit)TransactionEventRecord.fromDataInputV2(toDataInput(in));
+    Assert.assertEquals(in.getClass(), out.getClass());
+    Assert.assertEquals(in.getRecordType(), out.getRecordType());
+    Assert.assertEquals(in.getTransactionID(), out.getTransactionID());
+    Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID());
+  }
+
+  @Test
+  public void testBadHeader() throws IOException {
+    Put in = new Put(System.currentTimeMillis(),
+        WriteOrderOracle.next(),
+        new FlumeEvent(new HashMap<String, String>(), new byte[0]));
+    try {
+      TransactionEventRecord.fromDataInputV2(toDataInput(0, in));
+      Assert.fail();
+    } catch (IOException e) {
+      Assert.assertEquals("Header 0 is not the required value: deadbeef",
+          e.getMessage());
+    }
+  }
+
+  @Test
+  public void testBadType() throws IOException {
+    TransactionEventRecord in = mock(TransactionEventRecord.class);
+    when(in.getRecordType()).thenReturn(Short.MIN_VALUE);
+    try {
+      TransactionEventRecord.fromDataInputV2(toDataInput(in));
+      Assert.fail();
+    } catch(NullPointerException e) {
+      Assert.assertEquals("Unknown action ffff8000", e.getMessage());
+    }
+  }
+
+  private DataInput toDataInput(TransactionEventRecord record) throws IOException {
+    ByteBuffer buffer = TransactionEventRecord.toByteBufferV2(record);
+    ByteArrayInputStream byteInput = new ByteArrayInputStream(buffer.array());
+    DataInputStream dataInput = new DataInputStream(byteInput);
+    return dataInput;
+  }
+  private DataInput toDataInput(int header, TransactionEventRecord record) throws IOException
{
+    ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
+    DataOutputStream dataOutput = new DataOutputStream(byteOutput);
+    dataOutput.writeInt(header);
+    dataOutput.writeShort(record.getRecordType());
+    dataOutput.writeLong(record.getTransactionID());
+    dataOutput.writeLong(record.getLogWriteOrderID());
+    record.write(dataOutput);
+    ByteArrayInputStream byteInput = new ByteArrayInputStream(byteOutput.toByteArray());
+    DataInputStream dataInput = new DataInputStream(byteInput);
+    return dataInput;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
new file mode 100644
index 0000000..a9866a0
--- /dev/null
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.file;
+
+import static org.mockito.Mockito.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+public class TestTransactionEventRecordV3 {
+
+  @Test
+  public void testTypes() throws IOException {
+    Put put = new Put(System.currentTimeMillis(), WriteOrderOracle.next());
+    Assert.assertEquals(TransactionEventRecord.Type.PUT.get(),
+        put.getRecordType());
+
+    Take take = new Take(System.currentTimeMillis(), WriteOrderOracle.next());
+    Assert.assertEquals(TransactionEventRecord.Type.TAKE.get(),
+        take.getRecordType());
+
+    Rollback rollback = new Rollback(System.currentTimeMillis(),
+        WriteOrderOracle.next());
+    Assert.assertEquals(TransactionEventRecord.Type.ROLLBACK.get(),
+        rollback.getRecordType());
+
+    Commit commit = new Commit(System.currentTimeMillis(),
+        WriteOrderOracle.next());
+    Assert.assertEquals(TransactionEventRecord.Type.COMMIT.get(),
+        commit.getRecordType());
+  }
+  @Test
+  public void testPutSerialization() throws IOException {
+    Map<String, String> headers = new HashMap<String, String>();
+    headers.put("key", "value");
+    Put in = new Put(System.currentTimeMillis(),
+        WriteOrderOracle.next(),
+        new FlumeEvent(headers, new byte[0]));
+    Put out = (Put)TransactionEventRecord.fromInputStream(toInputStream(in));
+    Assert.assertEquals(in.getClass(), out.getClass());
+    Assert.assertEquals(in.getRecordType(), out.getRecordType());
+    Assert.assertEquals(in.getTransactionID(), out.getTransactionID());
+    Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID());
+    Assert.assertEquals(in.getEvent().getHeaders(), out.getEvent().getHeaders());
+    Assert.assertEquals(headers, in.getEvent().getHeaders());
+    Assert.assertEquals(headers, out.getEvent().getHeaders());
+    Assert.assertTrue(Arrays.equals(in.getEvent().getBody(), out.getEvent().getBody()));
+  }
+  @Test
+  public void testPutSerializationNullHeader() throws IOException {
+    Put in = new Put(System.currentTimeMillis(),
+        WriteOrderOracle.next(),
+        new FlumeEvent(null, new byte[0]));
+    Put out = (Put)TransactionEventRecord.fromInputStream(toInputStream(in));
+    Assert.assertEquals(in.getClass(), out.getClass());
+    Assert.assertEquals(in.getRecordType(), out.getRecordType());
+    Assert.assertEquals(in.getTransactionID(), out.getTransactionID());
+    Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID());
+    Assert.assertNull(in.getEvent().getHeaders());
+    Assert.assertNotNull(out.getEvent().getHeaders());
+    Assert.assertTrue(Arrays.equals(in.getEvent().getBody(), out.getEvent().getBody()));
+  }
+  @Test
+  public void testTakeSerialization() throws IOException {
+    Take in = new Take(System.currentTimeMillis(),
+        WriteOrderOracle.next(), 10, 20);
+    Take out = (Take)TransactionEventRecord.fromInputStream(toInputStream(in));
+    Assert.assertEquals(in.getClass(), out.getClass());
+    Assert.assertEquals(in.getRecordType(), out.getRecordType());
+    Assert.assertEquals(in.getTransactionID(), out.getTransactionID());
+    Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID());
+    Assert.assertEquals(in.getFileID(), out.getFileID());
+    Assert.assertEquals(in.getOffset(), out.getOffset());
+  }
+
+  @Test
+  public void testRollbackSerialization() throws IOException {
+    Rollback in = new Rollback(System.currentTimeMillis(),
+        WriteOrderOracle.next());
+    Rollback out = (Rollback)TransactionEventRecord.fromInputStream(toInputStream(in));
+    Assert.assertEquals(in.getClass(), out.getClass());
+    Assert.assertEquals(in.getRecordType(), out.getRecordType());
+    Assert.assertEquals(in.getTransactionID(), out.getTransactionID());
+    Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID());
+  }
+
+  @Test
+  public void testCommitSerialization() throws IOException {
+    Commit in = new Commit(System.currentTimeMillis(),
+        WriteOrderOracle.next());
+    Commit out = (Commit)TransactionEventRecord.fromInputStream(toInputStream(in));
+    Assert.assertEquals(in.getClass(), out.getClass());
+    Assert.assertEquals(in.getRecordType(), out.getRecordType());
+    Assert.assertEquals(in.getTransactionID(), out.getTransactionID());
+    Assert.assertEquals(in.getLogWriteOrderID(), out.getLogWriteOrderID());
+  }
+
+  @Test
+  public void testBadType() throws IOException {
+    TransactionEventRecord in = mock(TransactionEventRecord.class);
+    when(in.getRecordType()).thenReturn(Short.MIN_VALUE);
+    try {
+      TransactionEventRecord.fromInputStream(toInputStream(in));
+      Assert.fail();
+    } catch(NullPointerException e) {
+      Assert.assertEquals("Unknown action ffff8000", e.getMessage());
+    }
+  }
+
+  private InputStream toInputStream(TransactionEventRecord record) throws IOException {
+    ByteBuffer buffer = TransactionEventRecord.toByteBuffer(record);
+    ByteArrayInputStream byteInput = new ByteArrayInputStream(buffer.array());
+    return byteInput;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
index e64f856..48948e4 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
@@ -18,27 +18,34 @@
  */
 package org.apache.flume.channel.file;
 
-import com.google.common.base.Charsets;
+import static org.fest.reflect.core.Reflection.*;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.net.URL;
 import java.util.Map;
-
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import java.util.Set;
 import java.util.UUID;
+import java.util.zip.GZIPInputStream;
+
 import org.apache.flume.Channel;
 import org.apache.flume.Event;
 import org.apache.flume.Transaction;
 import org.apache.flume.event.EventBuilder;
+import org.apache.hadoop.io.Writable;
 import org.junit.Assert;
-import static org.fest.reflect.core.Reflection.*;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Resources;
 
 public class TestUtils {
 
@@ -162,4 +169,10 @@ public class TestUtils {
     }
     return result;
   }
+  public static void copyDecompressed(String resource, File output)
+      throws IOException {
+    URL input =  Resources.getResource(resource);
+    ByteStreams.copy(new GZIPInputStream(input.openStream()),
+        new FileOutputStream(output));
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/810dfe28/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index aaf944f..1c267e9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -673,6 +673,13 @@ limitations under the License.
       </dependency>
 
       <dependency>
+        <groupId>com.google.protobuf</groupId>
+        <artifactId>protobuf-java</artifactId>
+        <scope>compile</scope>
+        <version>2.4.1</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.mortbay.jetty</groupId>
         <artifactId>servlet-api</artifactId>
         <version>2.5-20110124</version>


Mime
View raw message