flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject git commit: FLUME-1689: BodyTextSerializer should allow an option not to add a newline to each serialized event
Date Fri, 09 Nov 2012 19:55:55 GMT
Updated Branches:
  refs/heads/flume-1.4 ddd0a9365 -> 9dade66ae


FLUME-1689: BodyTextSerializer should allow an option not to add a newline to each serialized
event

(Mike Percy via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/9dade66a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/9dade66a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/9dade66a

Branch: refs/heads/flume-1.4
Commit: 9dade66ae867a0df1b9c1327c62a9e26e41a625c
Parents: ddd0a93
Author: Brock Noland <brock@apache.org>
Authored: Fri Nov 9 13:53:05 2012 -0600
Committer: Brock Noland <brock@apache.org>
Committed: Fri Nov 9 13:55:08 2012 -0600

----------------------------------------------------------------------
 .../serialization/BodyTextEventSerializer.java     |   23 +++---
 .../serialization/TestBodyTextEventSerializer.java |   42 ++++++++--
 flume-ng-doc/sphinx/FlumeUserGuide.rst             |   58 +++++++++++++++
 3 files changed, 103 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/9dade66a/flume-ng-core/src/main/java/org/apache/flume/serialization/BodyTextEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/BodyTextEventSerializer.java
b/flume-ng-core/src/main/java/org/apache/flume/serialization/BodyTextEventSerializer.java
index dc291cd..d09f02d 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/serialization/BodyTextEventSerializer.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/BodyTextEventSerializer.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.io.OutputStream;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
-import org.apache.flume.conf.Configurable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,23 +29,24 @@ import org.slf4j.LoggerFactory;
  * This class simply writes the body of the event to the output stream
  * and appends a newline after each event.
  */
-public class BodyTextEventSerializer implements EventSerializer, Configurable {
+public class BodyTextEventSerializer implements EventSerializer {
 
   private final static Logger logger =
       LoggerFactory.getLogger(BodyTextEventSerializer.class);
 
+  // for legacy reasons, by default, append a newline to each event written out
+  private final String APPEND_NEWLINE = "appendNewline";
+  private final boolean APPEND_NEWLINE_DFLT = true;
+
   private final OutputStream out;
+  private final boolean appendNewline;
 
-  private BodyTextEventSerializer(OutputStream out) {
+  private BodyTextEventSerializer(OutputStream out, Context ctx) {
+    this.appendNewline = ctx.getBoolean(APPEND_NEWLINE, APPEND_NEWLINE_DFLT);
     this.out = out;
   }
 
   @Override
-  public void configure(Context context) {
-    // noop
-  }
-
-  @Override
   public boolean supportsReopen() {
     return true;
   }
@@ -69,7 +69,9 @@ public class BodyTextEventSerializer implements EventSerializer, Configurable
{
   @Override
   public void write(Event e) throws IOException {
     out.write(e.getBody());
-    out.write('\n');
+    if (appendNewline) {
+      out.write('\n');
+    }
   }
 
   @Override
@@ -81,8 +83,7 @@ public class BodyTextEventSerializer implements EventSerializer, Configurable
{
 
     @Override
     public EventSerializer build(Context context, OutputStream out) {
-      BodyTextEventSerializer s = new BodyTextEventSerializer(out);
-      s.configure(context);
+      BodyTextEventSerializer s = new BodyTextEventSerializer(out, context);
       return s;
     }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/9dade66a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestBodyTextEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestBodyTextEventSerializer.java
b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestBodyTextEventSerializer.java
index e3cb255..b1a6c13 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestBodyTextEventSerializer.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestBodyTextEventSerializer.java
@@ -27,6 +27,7 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.io.OutputStream;
 import org.apache.commons.io.FileUtils;
+import org.apache.flume.Context;
 import org.apache.flume.event.EventBuilder;
 import org.junit.Assert;
 import org.junit.Test;
@@ -34,13 +35,14 @@ import org.junit.Test;
 public class TestBodyTextEventSerializer {
 
   File testFile = new File("src/test/resources/events.txt");
+  File expectedFile = new File("src/test/resources/events.txt");
 
   @Test
-  public void test() throws FileNotFoundException, IOException {
+  public void testWithNewline() throws FileNotFoundException, IOException {
 
     OutputStream out = new FileOutputStream(testFile);
     EventSerializer serializer =
-        EventSerializerFactory.getInstance("text", null, out);
+        EventSerializerFactory.getInstance("text", new Context(), out);
     serializer.afterCreate();
     serializer.write(EventBuilder.withBody("event 1", Charsets.UTF_8));
     serializer.write(EventBuilder.withBody("event 2", Charsets.UTF_8));
@@ -51,14 +53,36 @@ public class TestBodyTextEventSerializer {
     out.close();
 
     BufferedReader reader = new BufferedReader(new FileReader(testFile));
-    String line;
-    int num = 0;
-    while ((line = reader.readLine()) != null) {
-      System.out.println(line);
-      num++;
-    }
+    Assert.assertEquals("event 1", reader.readLine());
+    Assert.assertEquals("event 2", reader.readLine());
+    Assert.assertEquals("event 3", reader.readLine());
+    Assert.assertNull(reader.readLine());
 
-    Assert.assertEquals(3, num);
+    FileUtils.forceDelete(testFile);
+  }
+
+  @Test
+  public void testNoNewline() throws FileNotFoundException, IOException {
+
+    OutputStream out = new FileOutputStream(testFile);
+    Context context = new Context();
+    context.put("appendNewline", "false");
+    EventSerializer serializer =
+        EventSerializerFactory.getInstance("text", context, out);
+    serializer.afterCreate();
+    serializer.write(EventBuilder.withBody("event 1\n", Charsets.UTF_8));
+    serializer.write(EventBuilder.withBody("event 2\n", Charsets.UTF_8));
+    serializer.write(EventBuilder.withBody("event 3\n", Charsets.UTF_8));
+    serializer.flush();
+    serializer.beforeClose();
+    out.flush();
+    out.close();
+
+    BufferedReader reader = new BufferedReader(new FileReader(testFile));
+    Assert.assertEquals("event 1", reader.readLine());
+    Assert.assertEquals("event 2", reader.readLine());
+    Assert.assertEquals("event 3", reader.readLine());
+    Assert.assertNull(reader.readLine());
 
     FileUtils.forceDelete(testFile);
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/9dade66a/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index ab2c923..a9590ec 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1866,6 +1866,64 @@ Custom Sink Processor
 
 Custom sink processors are not supported at the moment.
 
+Event Serializers
+-----------------
+
+The ``FILE_ROLL`` sink and the ``HDFS`` sink both support the
+``EventSerializer`` interface. Details of the EventSerializers that ship with
+Flume are provided below.
+
+Body Text Serializer
+~~~~~~~~~~~~~~~~~~~~
+
+Alias: ``TEXT``. This interceptor writes the body of the event to an output
+stream without any transformation or modification. The event headers are
+ignored. Configuration options are as follows:
+
+=========================  ================  ===========================================================================
+Property Name              Default           Description
+=========================  ================  ===========================================================================
+appendNewline              true              Whether a newline will be appended to each event
at write time. The default
+                                             of true assumes that events do not contain newlines,
for legacy reasons.
+=========================  ================  ===========================================================================
+
+Example for agent named **agent_foo**:
+
+.. code-block:: properties
+
+  agent_foo.sinks = fileSink-1
+  agent_foo.sinks.fileSink-1.type = FILE_ROLL
+  agent_foo.sinks.fileSink-1.channel = memoryChannel-1
+  agent_foo.sinks.fileSink-1.sink.directory = /var/log/flume
+  agent_foo.sinks.fileSink-1.sink.serializer = TEXT
+  agent_foo.sinks.fileSink-1.sink.serializer.appendNewline = false
+
+Avro Event Serializer
+~~~~~~~~~~~~~~~~~~~~~
+
+Alias: ``AVRO_EVENT``. This interceptor serializes Flume events into an Avro
+container file. The schema used is the same schema used for Flume events
+in the Avro RPC mechanism. This serializers inherits from the
+``AbstractAvroEventSerializer`` class. Configuration options are as follows:
+
+==========================  ================  ===========================================================================
+Property Name               Default           Description
+==========================  ================  ===========================================================================
+syncIntervalBytes           2048000           Avro sync interval, in approximate bytes.
+compressionCodec            null              Avro compression codec. For supported codecs,
see Avro's CodecFactory docs.
+==========================  ================  ===========================================================================
+
+Example for agent named **agent_foo**:
+
+.. code-block:: properties
+
+  agent_foo.sinks.hdfsSink-1.type = hdfs
+  agent_foo.sinks.hdfsSink-1.channel = memoryChannel-1
+  agent_foo.sinks.hdfsSink-1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
+  agent_foo.sinks.hdfsSink-1.serializer = AVRO_EVENT
+  agent_foo.sinks.hdfsSink-1.serializer.compressionCodec = snappy
+
+
 Flume Interceptors
 ------------------
 


Mime
View raw message