flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From de...@apache.org
Subject flume git commit: FLUME-2620. File Channel to support empty values in headers
Date Tue, 25 Jul 2017 12:04:20 GMT
Repository: flume
Updated Branches:
  refs/heads/trunk c570a51b3 -> 1e69fc7c2


FLUME-2620. File Channel to support empty values in headers

Flume user guide does not specify whether a value in event header could be null or not.
Given an external system generating events which header values can be null and a user configures
Flume with Memory Channel then he will have no trouble.
Later on when the user changes Memory Channel to File Channel then Flume will fail with NPE.
It is because FC is serializing events with protocol buffer and header values are defined
as
required in the proto file.
In this patch I have changed the value field to optional. However protocol buffer does not
have
a notation for null and setting a field to null raises NPE again. Added a null check before
serialization to prevent this.
There is on caveat: When an optional field is not set, at deserialization it will be set to
a
default value: in this case it will be empty string.

Reviewers: Miklos Csanady

(Marcell Hegedus via Denes Arvay)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1e69fc7c
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1e69fc7c
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1e69fc7c

Branch: refs/heads/trunk
Commit: 1e69fc7c29f104a2117a62de11cba9b2a2c740e1
Parents: c570a51
Author: Marcell Hegedus <marcell.hegedus@gmail.com>
Authored: Wed Jul 19 14:27:56 2017 +0200
Committer: Denes Arvay <denes@apache.org>
Committed: Wed Jul 19 14:27:56 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/flume/channel/file/Put.java |  7 +++-
 .../flume/channel/file/proto/ProtosFactory.java | 40 ++++++++------------
 .../src/main/proto/filechannel.proto            |  2 +-
 .../flume/channel/file/TestFileChannel.java     | 24 ++++++++++++
 .../apache/flume/channel/TestMemoryChannel.java | 22 +++++++++++
 5 files changed, 68 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
index 0a70a24..c5ea290 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java
@@ -82,8 +82,11 @@ class Put extends TransactionEventRecord {
       for (String key : headers.keySet()) {
         String value = headers.get(key);
         headerBuilder.clear();
-        eventBuilder.addHeaders(headerBuilder.setKey(key)
-            .setValue(value).build());
+        headerBuilder.setKey(key);
+        if (value != null) {
+          headerBuilder.setValue(value);
+        }
+        eventBuilder.addHeaders(headerBuilder.build());
       }
     }
     eventBuilder.setBody(ByteString.copyFrom(event.getBody()));

http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
index 50492cc..202f33d 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
@@ -6831,17 +6831,17 @@ public final class ProtosFactory {
     com.google.protobuf.ByteString
         getKeyBytes();
 
-    // required string value = 2;
+    // optional string value = 2;
     /**
-     * <code>required string value = 2;</code>
+     * <code>optional string value = 2;</code>
      */
     boolean hasValue();
     /**
-     * <code>required string value = 2;</code>
+     * <code>optional string value = 2;</code>
      */
     java.lang.String getValue();
     /**
-     * <code>required string value = 2;</code>
+     * <code>optional string value = 2;</code>
      */
     com.google.protobuf.ByteString
         getValueBytes();
@@ -6990,17 +6990,17 @@ public final class ProtosFactory {
       }
     }
 
-    // required string value = 2;
+    // optional string value = 2;
     public static final int VALUE_FIELD_NUMBER = 2;
     private java.lang.Object value_;
     /**
-     * <code>required string value = 2;</code>
+     * <code>optional string value = 2;</code>
      */
     public boolean hasValue() {
       return ((bitField0_ & 0x00000002) == 0x00000002);
     }
     /**
-     * <code>required string value = 2;</code>
+     * <code>optional string value = 2;</code>
      */
     public java.lang.String getValue() {
       java.lang.Object ref = value_;
@@ -7017,7 +7017,7 @@ public final class ProtosFactory {
       }
     }
     /**
-     * <code>required string value = 2;</code>
+     * <code>optional string value = 2;</code>
      */
     public com.google.protobuf.ByteString
         getValueBytes() {
@@ -7046,10 +7046,6 @@ public final class ProtosFactory {
         memoizedIsInitialized = 0;
         return false;
       }
-      if (!hasValue()) {
-        memoizedIsInitialized = 0;
-        return false;
-      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -7271,10 +7267,6 @@ public final class ProtosFactory {
           
           return false;
         }
-        if (!hasValue()) {
-          
-          return false;
-        }
         return true;
       }
 
@@ -7371,16 +7363,16 @@ public final class ProtosFactory {
         return this;
       }
 
-      // required string value = 2;
+      // optional string value = 2;
       private java.lang.Object value_ = "";
       /**
-       * <code>required string value = 2;</code>
+       * <code>optional string value = 2;</code>
        */
       public boolean hasValue() {
         return ((bitField0_ & 0x00000002) == 0x00000002);
       }
       /**
-       * <code>required string value = 2;</code>
+       * <code>optional string value = 2;</code>
        */
       public java.lang.String getValue() {
         java.lang.Object ref = value_;
@@ -7394,7 +7386,7 @@ public final class ProtosFactory {
         }
       }
       /**
-       * <code>required string value = 2;</code>
+       * <code>optional string value = 2;</code>
        */
       public com.google.protobuf.ByteString
           getValueBytes() {
@@ -7410,7 +7402,7 @@ public final class ProtosFactory {
         }
       }
       /**
-       * <code>required string value = 2;</code>
+       * <code>optional string value = 2;</code>
        */
       public Builder setValue(
           java.lang.String value) {
@@ -7423,7 +7415,7 @@ public final class ProtosFactory {
         return this;
       }
       /**
-       * <code>required string value = 2;</code>
+       * <code>optional string value = 2;</code>
        */
       public Builder clearValue() {
         bitField0_ = (bitField0_ & ~0x00000002);
@@ -7432,7 +7424,7 @@ public final class ProtosFactory {
         return this;
       }
       /**
-       * <code>required string value = 2;</code>
+       * <code>optional string value = 2;</code>
        */
       public Builder setValueBytes(
           com.google.protobuf.ByteString value) {
@@ -7546,7 +7538,7 @@ public final class ProtosFactory {
       "ansactionEventFooter\">\n\nFlumeEvent\022\"\n\007he" +
       "aders\030\001 \003(\0132\021.FlumeEventHeader\022\014\n\004body\030\002",
       " \002(\014\".\n\020FlumeEventHeader\022\013\n\003key\030\001 \002(\t\022\r\n" +
-      "\005value\030\002 \002(\tB4\n#org.apache.flume.channel" +
+      "\005value\030\002 \001(\tB4\n#org.apache.flume.channel" +
       ".file.protoB\rProtosFactory"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =

http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto b/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
index 25520e8..929b41d 100644
--- a/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
+++ b/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
@@ -83,5 +83,5 @@ message FlumeEvent {
 
 message FlumeEventHeader {
   required string key = 1;
-  required string value = 2;
+  optional string value = 2;
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
index 8efe991..a3d27f7 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flume.channel.file;
 
+import com.google.common.base.Charsets;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -69,6 +70,7 @@ public class TestFileChannel extends TestFileChannelBase {
 
   private static final Logger LOG = LoggerFactory
           .getLogger(TestFileChannel.class);
+  public static final String TEST_KEY = "test_key";
 
   @Before
   public void setup() throws Exception {
@@ -234,6 +236,28 @@ public class TestFileChannel extends TestFileChannelBase {
   }
 
   @Test
+  public void testPutConvertsNullValueToEmptyStrInHeader() throws Exception {
+    channel.start();
+
+    Event event = EventBuilder.withBody("test body".getBytes(Charsets.UTF_8),
+        Collections.<String, String>singletonMap(TEST_KEY, null));
+
+    Transaction txPut = channel.getTransaction();
+    txPut.begin();
+    channel.put(event);
+    txPut.commit();
+    txPut.close();
+
+    Transaction txTake = channel.getTransaction();
+    txTake.begin();
+    Event eventTaken = channel.take();
+    Assert.assertArrayEquals(event.getBody(), eventTaken.getBody());
+    Assert.assertEquals("", eventTaken.getHeaders().get(TEST_KEY));
+    txTake.commit();
+    txTake.close();
+  }
+
+  @Test
   public void testCommitAfterNoPutTake() throws Exception {
     channel.start();
     Assert.assertTrue(channel.isOpen());

http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
index 344bb58..f7e43eb 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
@@ -19,6 +19,7 @@
 
 package org.apache.flume.channel;
 
+import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableMap;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
@@ -32,6 +33,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -72,6 +74,26 @@ public class TestMemoryChannel {
   }
 
   @Test
+  public void testPutAcceptsNullValueInHeader() {
+    Configurables.configure(channel, new Context());
+
+    Event event = EventBuilder.withBody("test body".getBytes(Charsets.UTF_8),
+        Collections.<String, String>singletonMap("test_key", null));
+
+    Transaction txPut = channel.getTransaction();
+    txPut.begin();
+    channel.put(event);
+    txPut.commit();
+    txPut.close();
+
+    Transaction txTake = channel.getTransaction();
+    txTake.begin();
+    Event eventTaken = channel.take();
+    Assert.assertEquals(event, eventTaken);
+    txTake.commit();
+  }
+
+  @Test
   public void testChannelResize() {
     Context context = new Context();
     Map<String, String> parms = new HashMap<String, String>();


Mime
View raw message