flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject [1/2] flume git commit: FLUME-2591. DatasetSink 2.0
Date Thu, 29 Jan 2015 01:42:58 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk 0d6eccad2 -> 1d49ef704


http://git-wip-us.apache.org/repos/asf/flume/blob/1d49ef70/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
index c46d66c..58aa467 100644
--- a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
+++ b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java
@@ -18,6 +18,8 @@
 
 package org.apache.flume.sink.kite;
 
+import org.apache.flume.sink.kite.parser.EntityParser;
+import org.apache.flume.sink.kite.policy.FailurePolicy;
 import com.google.common.base.Function;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
@@ -29,6 +31,7 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.net.URI;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -37,12 +40,14 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import javax.annotation.Nullable;
 import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.reflect.ReflectDatumWriter;
+import org.apache.avro.util.Utf8;
 import org.apache.commons.io.FileUtils;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
@@ -52,6 +57,7 @@ import org.apache.flume.Transaction;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.SimpleEvent;
+import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -64,9 +70,11 @@ import org.junit.Test;
 import org.kitesdk.data.Dataset;
 import org.kitesdk.data.DatasetDescriptor;
 import org.kitesdk.data.DatasetReader;
+import org.kitesdk.data.DatasetWriter;
 import org.kitesdk.data.Datasets;
 import org.kitesdk.data.PartitionStrategy;
 import org.kitesdk.data.View;
+import static org.mockito.Mockito.*;
 
 public class TestDatasetSink {
 
@@ -74,6 +82,8 @@ public class TestDatasetSink {
   public static final String DATASET_NAME = "test";
   public static final String FILE_DATASET_URI =
       "dataset:file:target/test-repo/" + DATASET_NAME;
+  public static final String ERROR_DATASET_URI =
+      "dataset:file:target/test-repo/failed-events";
   public static final File SCHEMA_FILE = new File("target/record-schema.avsc");
   public static final Schema RECORD_SCHEMA = new Schema.Parser().parse(
       "{\"type\":\"record\",\"name\":\"rec\",\"fields\":[" +
@@ -127,6 +137,7 @@ public class TestDatasetSink {
     Datasets.create(FILE_DATASET_URI, DESCRIPTOR);
 
     this.config = new Context();
+    config.put("keep-alive", "0");
     this.in = new MemoryChannel();
     Configurables.configure(in, config);
 
@@ -195,7 +206,7 @@ public class TestDatasetSink {
   }
 
   @Test
-  public void testFileStore() throws EventDeliveryException {
+  public void testFileStore() throws EventDeliveryException, NonRecoverableEventException,
NonRecoverableEventException {
     DatasetSink sink = sink(in, config);
 
     // run the sink
@@ -222,6 +233,19 @@ public class TestDatasetSink {
     // run the sink
     sink.start();
     sink.process();
+
+    // the transaction should not commit during the call to process
+    assertThrows("Transaction should still be open", IllegalStateException.class,
+        new Callable() {
+          @Override
+          public Object call() throws EventDeliveryException {
+            in.getTransaction().begin();
+            return null;
+          }
+        });
+    // The records won't commit until the call to stop()
+    Assert.assertEquals("Should not have committed", 0, read(created).size());
+
     sink.stop();
 
     Assert.assertEquals(Sets.newHashSet(expected), read(created));
@@ -509,6 +533,376 @@ public class TestDatasetSink {
         expected.size() + 1, remaining(in));
   }
 
+  @Test
+  public void testFileStoreWithSavePolicy() throws EventDeliveryException {
+    if (Datasets.exists(ERROR_DATASET_URI)) {
+      Datasets.delete(ERROR_DATASET_URI);
+    }
+    config.put(DatasetSinkConstants.CONFIG_FAILURE_POLICY,
+        DatasetSinkConstants.SAVE_FAILURE_POLICY);
+    config.put(DatasetSinkConstants.CONFIG_KITE_ERROR_DATASET_URI,
+        ERROR_DATASET_URI);
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+    sink.stop();
+
+    Assert.assertEquals(
+        Sets.newHashSet(expected),
+        read(Datasets.load(FILE_DATASET_URI)));
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+  }
+
+  @Test
+  public void testMissingSchemaWithSavePolicy() throws EventDeliveryException {
+    if (Datasets.exists(ERROR_DATASET_URI)) {
+      Datasets.delete(ERROR_DATASET_URI);
+    }
+    config.put(DatasetSinkConstants.CONFIG_FAILURE_POLICY,
+        DatasetSinkConstants.SAVE_FAILURE_POLICY);
+    config.put(DatasetSinkConstants.CONFIG_KITE_ERROR_DATASET_URI,
+        ERROR_DATASET_URI);
+    final DatasetSink sink = sink(in, config);
+
+    Event badEvent = new SimpleEvent();
+    badEvent.setHeaders(Maps.<String, String>newHashMap());
+    badEvent.setBody(serialize(expected.get(0), RECORD_SCHEMA));
+    putToChannel(in, badEvent);
+
+    // run the sink
+    sink.start();
+    sink.process();
+    sink.stop();
+
+    Assert.assertEquals("Good records should have been written",
+        Sets.newHashSet(expected),
+        read(Datasets.load(FILE_DATASET_URI)));
+    Assert.assertEquals("Should not have rolled back", 0, remaining(in));
+    Assert.assertEquals("Should have saved the bad event",
+        Sets.newHashSet(AvroFlumeEvent.newBuilder()
+          .setBody(ByteBuffer.wrap(badEvent.getBody()))
+          .setHeaders(toUtf8Map(badEvent.getHeaders()))
+          .build()),
+        read(Datasets.load(ERROR_DATASET_URI, AvroFlumeEvent.class)));
+  }
+
+  @Test
+  public void testSerializedWithIncompatibleSchemasWithSavePolicy()
+      throws EventDeliveryException {
+    if (Datasets.exists(ERROR_DATASET_URI)) {
+      Datasets.delete(ERROR_DATASET_URI);
+    }
+    config.put(DatasetSinkConstants.CONFIG_FAILURE_POLICY,
+        DatasetSinkConstants.SAVE_FAILURE_POLICY);
+    config.put(DatasetSinkConstants.CONFIG_KITE_ERROR_DATASET_URI,
+        ERROR_DATASET_URI);
+    final DatasetSink sink = sink(in, config);
+
+    GenericRecordBuilder builder = new GenericRecordBuilder(
+        INCOMPATIBLE_SCHEMA);
+    GenericData.Record rec = builder.set("username", "koala").build();
+
+    // We pass in a valid schema in the header, but an incompatible schema
+    // was used to serialize the record
+    Event badEvent = event(rec, INCOMPATIBLE_SCHEMA, SCHEMA_FILE, true);
+    putToChannel(in, badEvent);
+
+    // run the sink
+    sink.start();
+    sink.process();
+    sink.stop();
+
+    Assert.assertEquals("Good records should have been written",
+        Sets.newHashSet(expected),
+        read(Datasets.load(FILE_DATASET_URI)));
+    Assert.assertEquals("Should not have rolled back", 0, remaining(in));
+    Assert.assertEquals("Should have saved the bad event",
+        Sets.newHashSet(AvroFlumeEvent.newBuilder()
+          .setBody(ByteBuffer.wrap(badEvent.getBody()))
+          .setHeaders(toUtf8Map(badEvent.getHeaders()))
+          .build()),
+        read(Datasets.load(ERROR_DATASET_URI, AvroFlumeEvent.class)));
+  }
+
+  @Test
+  public void testSerializedWithIncompatibleSchemas() throws EventDeliveryException {
+    final DatasetSink sink = sink(in, config);
+
+    GenericRecordBuilder builder = new GenericRecordBuilder(
+        INCOMPATIBLE_SCHEMA);
+    GenericData.Record rec = builder.set("username", "koala").build();
+
+    // We pass in a valid schema in the header, but an incompatible schema
+    // was used to serialize the record
+    putToChannel(in, event(rec, INCOMPATIBLE_SCHEMA, SCHEMA_FILE, true));
+
+    // run the sink
+    sink.start();
+    assertThrows("Should fail", EventDeliveryException.class,
+        new Callable() {
+          @Override
+          public Object call() throws EventDeliveryException {
+            sink.process();
+            return null;
+          }
+        });
+    sink.stop();
+
+    Assert.assertEquals("Should have rolled back",
+        expected.size() + 1, remaining(in));
+  }
+
+  @Test
+  public void testCommitOnBatch() throws EventDeliveryException {
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+
+    // the transaction should commit during the call to process
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+    // but the data won't be visible yet
+    Assert.assertEquals(0,
+        read(Datasets.load(FILE_DATASET_URI)).size());
+
+    sink.stop();
+
+    Assert.assertEquals(
+        Sets.newHashSet(expected),
+        read(Datasets.load(FILE_DATASET_URI)));
+  }
+
+  @Test
+  public void testCommitOnBatchFalse() throws EventDeliveryException {
+    config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
+        Boolean.toString(false));
+    config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH,
+        Boolean.toString(false));
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+
+    // the transaction should not commit during the call to process
+    assertThrows("Transaction should still be open", IllegalStateException.class,
+        new Callable() {
+          @Override
+          public Object call() throws EventDeliveryException {
+            in.getTransaction().begin();
+            return null;
+          }
+        });
+
+    // the data won't be visible
+    Assert.assertEquals(0,
+        read(Datasets.load(FILE_DATASET_URI)).size());
+
+    sink.stop();
+
+    Assert.assertEquals(
+        Sets.newHashSet(expected),
+        read(Datasets.load(FILE_DATASET_URI)));
+    // the transaction should commit during the call to stop
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+  }
+
+  @Test
+  public void testCommitOnBatchFalseSyncOnBatchTrue() throws EventDeliveryException {
+    config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
+        Boolean.toString(false));
+    config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH,
+        Boolean.toString(true));
+
+    try {
+      sink(in, config);
+      Assert.fail("Should have thrown IllegalArgumentException");
+    } catch (IllegalArgumentException ex) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testCloseAndCreateWriter() throws EventDeliveryException {
+    config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
+        Boolean.toString(false));
+    config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH,
+        Boolean.toString(false));
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+
+    sink.closeWriter();
+    sink.commitTransaction();
+    sink.createWriter();
+
+    Assert.assertNotNull("Writer should not be null", sink.getWriter());
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+
+    sink.stop();
+
+    Assert.assertEquals(
+        Sets.newHashSet(expected),
+        read(Datasets.load(FILE_DATASET_URI)));
+  }
+
+  @Test
+  public void testCloseWriter() throws EventDeliveryException {
+    config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
+        Boolean.toString(false));
+    config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH,
+        Boolean.toString(false));
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+
+    sink.closeWriter();
+    sink.commitTransaction();
+
+    Assert.assertNull("Writer should be null", sink.getWriter());
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+
+    sink.stop();
+
+    Assert.assertEquals(
+        Sets.newHashSet(expected),
+        read(Datasets.load(FILE_DATASET_URI)));
+  }
+
+  @Test
+  public void testCreateWriter() throws EventDeliveryException {
+    config.put(DatasetSinkConstants.CONFIG_FLUSHABLE_COMMIT_ON_BATCH,
+        Boolean.toString(false));
+    config.put(DatasetSinkConstants.CONFIG_SYNCABLE_SYNC_ON_BATCH,
+        Boolean.toString(false));
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+
+    sink.commitTransaction();
+    sink.createWriter();
+    Assert.assertNotNull("Writer should not be null", sink.getWriter());
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+
+    sink.stop();
+
+    Assert.assertEquals(0, read(Datasets.load(FILE_DATASET_URI)).size());
+  }
+
+  @Test
+  public void testAppendWriteExceptionInvokesPolicy()
+      throws EventDeliveryException, NonRecoverableEventException {
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+
+    // Mock an Event
+    Event mockEvent = mock(Event.class);
+    when(mockEvent.getBody()).thenReturn(new byte[] { 0x01 });
+
+    // Mock a GenericRecord
+    GenericRecord mockRecord = mock(GenericRecord.class);
+
+    // Mock an EntityParser
+    EntityParser<GenericRecord> mockParser = mock(EntityParser.class);
+    when(mockParser.parse(eq(mockEvent), any(GenericRecord.class)))
+        .thenReturn(mockRecord);
+    sink.setParser(mockParser);
+
+    // Mock a FailurePolicy
+    FailurePolicy mockFailurePolicy = mock(FailurePolicy.class);
+    sink.setFailurePolicy(mockFailurePolicy);
+
+    // Mock a DatasetWriter
+    DatasetWriter<GenericRecord> mockWriter = mock(DatasetWriter.class);
+    doThrow(new DataFileWriter.AppendWriteException(new IOException()))
+        .when(mockWriter).write(mockRecord);
+
+    sink.setWriter(mockWriter);
+    sink.write(mockEvent);
+
+    // Verify that the event was sent to the failure policy
+    verify(mockFailurePolicy).handle(eq(mockEvent), any(Throwable.class));
+
+    sink.stop();
+  }
+
+  @Test
+  public void testRuntimeExceptionThrowsEventDeliveryException()
+      throws EventDeliveryException, NonRecoverableEventException {
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+
+    // Mock an Event
+    Event mockEvent = mock(Event.class);
+    when(mockEvent.getBody()).thenReturn(new byte[] { 0x01 });
+
+    // Mock a GenericRecord
+    GenericRecord mockRecord = mock(GenericRecord.class);
+
+    // Mock an EntityParser
+    EntityParser<GenericRecord> mockParser = mock(EntityParser.class);
+    when(mockParser.parse(eq(mockEvent), any(GenericRecord.class)))
+        .thenReturn(mockRecord);
+    sink.setParser(mockParser);
+
+    // Mock a FailurePolicy
+    FailurePolicy mockFailurePolicy = mock(FailurePolicy.class);
+    sink.setFailurePolicy(mockFailurePolicy);
+
+    // Mock a DatasetWriter
+    DatasetWriter<GenericRecord> mockWriter = mock(DatasetWriter.class);
+    doThrow(new RuntimeException()).when(mockWriter).write(mockRecord);
+
+    sink.setWriter(mockWriter);
+
+    try {
+      sink.write(mockEvent);
+      Assert.fail("Should throw EventDeliveryException");
+    } catch (EventDeliveryException ex) {
+
+    }
+
+    // Verify that the event was not sent to the failure policy
+    verify(mockFailurePolicy, never()).handle(eq(mockEvent), any(Throwable.class));
+
+    sink.stop();
+  }
+
+  @Test
+  public void testProcessHandlesNullWriter() throws EventDeliveryException,
+      NonRecoverableEventException, NonRecoverableEventException {
+    DatasetSink sink = sink(in, config);
+
+    // run the sink
+    sink.start();
+    sink.process();
+
+    // explicitly set the writer to null
+    sink.setWriter(null);
+
+    // this should not throw an NPE
+    sink.process();
+
+    sink.stop();
+
+    Assert.assertEquals("Should have committed", 0, remaining(in));
+  }
+
   public static DatasetSink sink(Channel in, Context config) {
     DatasetSink sink = new DatasetSink();
     sink.setChannel(in);
@@ -621,4 +1015,19 @@ public class TestDatasetSink {
       Assert.assertEquals(message, expected, actual.getClass());
     }
   }
+
+  /**
+   * Helper function to convert a map of String to a map of Utf8.
+   *
+   * @param map A Map of String to String
+   * @return The same mappings converting the {@code String}s to {@link Utf8}s
+   */
+  public static Map<CharSequence, CharSequence> toUtf8Map(
+      Map<String, String> map) {
+    Map<CharSequence, CharSequence> utf8Map = Maps.newHashMap();
+    for (Map.Entry<String, String> entry : map.entrySet()) {
+      utf8Map.put(new Utf8(entry.getKey()), new Utf8(entry.getValue()));
+    }
+    return utf8Map;
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/1d49ef70/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7d4f01a..1350fa4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,7 +51,7 @@ limitations under the License.
 
     <hadoop2.version>2.4.0</hadoop2.version>
     <thrift.version>0.7.0</thrift.version>
-    <kite.version>0.15.0</kite.version>
+    <kite.version>0.17.1</kite.version>
     <hive.version>0.10.0</hive.version>
   </properties>
 


Mime
View raw message