flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject git commit: FLUME-1770: Flume should have serializer which supports serializer the headers to a simple string
Date Thu, 20 Dec 2012 21:27:29 GMT
Updated Branches:
  refs/heads/trunk 5fdf673d1 -> 1675d49a2


FLUME-1770: Flume should have serializer which supports serializer the headers to a simple
string

(Thom DeCarlo via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1675d49a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1675d49a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1675d49a

Branch: refs/heads/trunk
Commit: 1675d49a217a81753c67f48b56de543934ce117b
Parents: 5fdf673
Author: Brock Noland <brock@apache.org>
Authored: Thu Dec 20 15:26:52 2012 -0600
Committer: Brock Noland <brock@apache.org>
Committed: Thu Dec 20 15:26:52 2012 -0600

----------------------------------------------------------------------
 .../flume/serialization/EventSerializerType.java   |    1 +
 .../HeaderAndBodyTextEventSerializer.java          |   93 +++++++++++++
 .../serialization/TestBodyTextEventSerializer.java |    2 +
 .../TestHeaderAndBodyTextEventSerializer.java      |  103 +++++++++++++++
 4 files changed, 199 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/1675d49a/flume-ng-core/src/main/java/org/apache/flume/serialization/EventSerializerType.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/EventSerializerType.java
b/flume-ng-core/src/main/java/org/apache/flume/serialization/EventSerializerType.java
index 450b96f..86d243f 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/serialization/EventSerializerType.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/EventSerializerType.java
@@ -25,6 +25,7 @@ import org.apache.flume.annotations.InterfaceStability;
 @InterfaceStability.Unstable
 public enum EventSerializerType {
   TEXT(BodyTextEventSerializer.Builder.class),
+  HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder.class),
   AVRO_EVENT(FlumeEventAvroEventSerializer.Builder.class),
   OTHER(null);
 

http://git-wip-us.apache.org/repos/asf/flume/blob/1675d49a/flume-ng-core/src/main/java/org/apache/flume/serialization/HeaderAndBodyTextEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/HeaderAndBodyTextEventSerializer.java
b/flume-ng-core/src/main/java/org/apache/flume/serialization/HeaderAndBodyTextEventSerializer.java
new file mode 100644
index 0000000..9c6003c
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/HeaderAndBodyTextEventSerializer.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.serialization;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class simply writes the body of the event to the output stream
+ * and appends a newline after each event.
+ */
+public class HeaderAndBodyTextEventSerializer implements EventSerializer {
+
+  private final static Logger logger =
+      LoggerFactory.getLogger(HeaderAndBodyTextEventSerializer.class);
+
+  // for legacy reasons, by default, append a newline to each event written out
+  private final String APPEND_NEWLINE = "appendNewline";
+  private final boolean APPEND_NEWLINE_DFLT = true;
+
+  private final OutputStream out;
+  private final boolean appendNewline;
+
+  private HeaderAndBodyTextEventSerializer(OutputStream out, Context ctx) {
+    this.appendNewline = ctx.getBoolean(APPEND_NEWLINE, APPEND_NEWLINE_DFLT);
+    this.out = out;
+  }
+
+  @Override
+  public boolean supportsReopen() {
+    return true;
+  }
+
+  @Override
+  public void afterCreate() {
+    // noop
+  }
+
+  @Override
+  public void afterReopen() {
+    // noop
+  }
+
+  @Override
+  public void beforeClose() {
+    // noop
+  }
+
+  @Override
+  public void write(Event e) throws IOException {
+    out.write((e.getHeaders() + " ").getBytes());
+    out.write(e.getBody());
+    if (appendNewline) {
+      out.write('\n');
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    // noop
+  }
+
+  public static class Builder implements EventSerializer.Builder {
+
+    @Override
+    public EventSerializer build(Context context, OutputStream out) {
+      HeaderAndBodyTextEventSerializer s = new HeaderAndBodyTextEventSerializer(out, context);
+      return s;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/1675d49a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestBodyTextEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestBodyTextEventSerializer.java
b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestBodyTextEventSerializer.java
index b1a6c13..b8f8953 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestBodyTextEventSerializer.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestBodyTextEventSerializer.java
@@ -57,6 +57,7 @@ public class TestBodyTextEventSerializer {
     Assert.assertEquals("event 2", reader.readLine());
     Assert.assertEquals("event 3", reader.readLine());
     Assert.assertNull(reader.readLine());
+    reader.close();
 
     FileUtils.forceDelete(testFile);
   }
@@ -83,6 +84,7 @@ public class TestBodyTextEventSerializer {
     Assert.assertEquals("event 2", reader.readLine());
     Assert.assertEquals("event 3", reader.readLine());
     Assert.assertNull(reader.readLine());
+    reader.close();
 
     FileUtils.forceDelete(testFile);
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/1675d49a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestHeaderAndBodyTextEventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestHeaderAndBodyTextEventSerializer.java
b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestHeaderAndBodyTextEventSerializer.java
new file mode 100644
index 0000000..df11144
--- /dev/null
+++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestHeaderAndBodyTextEventSerializer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.serialization;
+
+import com.google.common.base.Charsets;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.Context;
+import org.apache.flume.event.EventBuilder;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestHeaderAndBodyTextEventSerializer {
+
+  File testFile = new File("src/test/resources/events.txt");
+  File expectedFile = new File("src/test/resources/events.txt");
+
+  @Test
+  public void testWithNewline() throws FileNotFoundException, IOException {
+
+    Map<String, String> headers = new HashMap<String, String>();
+    headers.put("header1", "value1");
+    headers.put("header2", "value2");
+
+    OutputStream out = new FileOutputStream(testFile);
+    EventSerializer serializer =
+        EventSerializerFactory.getInstance("header_and_text", new Context(), out);
+    serializer.afterCreate();
+    serializer.write(EventBuilder.withBody("event 1", Charsets.UTF_8, headers));
+    serializer.write(EventBuilder.withBody("event 2", Charsets.UTF_8, headers));
+    serializer.write(EventBuilder.withBody("event 3", Charsets.UTF_8, headers));
+    serializer.flush();
+    serializer.beforeClose();
+    out.flush();
+    out.close();
+
+    BufferedReader reader = new BufferedReader(new FileReader(testFile));
+    Assert.assertEquals("{header2=value2, header1=value1} event 1", reader.readLine());
+    Assert.assertEquals("{header2=value2, header1=value1} event 2", reader.readLine());
+    Assert.assertEquals("{header2=value2, header1=value1} event 3", reader.readLine());
+    Assert.assertNull(reader.readLine());
+    reader.close();
+
+    FileUtils.forceDelete(testFile);
+  }
+
+  @Test
+  public void testNoNewline() throws FileNotFoundException, IOException {
+
+    Map<String, String> headers = new HashMap<String, String>();
+    headers.put("header1", "value1");
+    headers.put("header2", "value2");
+
+    OutputStream out = new FileOutputStream(testFile);
+    Context context = new Context();
+    context.put("appendNewline", "false");
+    EventSerializer serializer =
+        EventSerializerFactory.getInstance("header_and_text", context, out);
+    serializer.afterCreate();
+    serializer.write(EventBuilder.withBody("event 1\n", Charsets.UTF_8, headers));
+    serializer.write(EventBuilder.withBody("event 2\n", Charsets.UTF_8, headers));
+    serializer.write(EventBuilder.withBody("event 3\n", Charsets.UTF_8, headers));
+    serializer.flush();
+    serializer.beforeClose();
+    out.flush();
+    out.close();
+
+    BufferedReader reader = new BufferedReader(new FileReader(testFile));
+    Assert.assertEquals("{header2=value2, header1=value1} event 1", reader.readLine());
+    Assert.assertEquals("{header2=value2, header1=value1} event 2", reader.readLine());
+    Assert.assertEquals("{header2=value2, header1=value1} event 3", reader.readLine());
+    Assert.assertNull(reader.readLine());
+    reader.close();
+
+    FileUtils.forceDelete(testFile);
+  }
+
+}


Mime
View raw message