Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B496C1669A3 for ; Tue, 25 Jul 2017 14:04:22 +0200 (CEST) Received: (qmail 75517 invoked by uid 500); 25 Jul 2017 12:04:21 -0000 Mailing-List: contact commits-help@flume.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flume.apache.org Delivered-To: mailing list commits@flume.apache.org Received: (qmail 75503 invoked by uid 99); 25 Jul 2017 12:04:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Jul 2017 12:04:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C39F3DFF9F; Tue, 25 Jul 2017 12:04:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: denes@apache.org To: commits@flume.apache.org Message-Id: <885745f180ff46f485c341e1430480f6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flume git commit: FLUME-2620. File Channel to support empty values in headers Date: Tue, 25 Jul 2017 12:04:20 +0000 (UTC) Repository: flume Updated Branches: refs/heads/trunk c570a51b3 -> 1e69fc7c2 FLUME-2620. File Channel to support empty values in headers Flume user guide does not specify whether a value in event header could be null or not. Given an external system generating events which header values can be null and a user configures Flume with Memory Channel then he will have no trouble. Later on when the user changes Memory Channel to File Channel then Flume will fail with NPE. It is because FC is serializing events with protocol buffer and header values are defined as required in the proto file. In this patch I have changed the value field to optional. However protocol buffer does not have a notation for null and setting a field to null raises NPE again. Added a null check before serialization to prevent this. There is on caveat: When an optional field is not set, at deserialization it will be set to a default value: in this case it will be empty string. Reviewers: Miklos Csanady (Marcell Hegedus via Denes Arvay) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1e69fc7c Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1e69fc7c Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1e69fc7c Branch: refs/heads/trunk Commit: 1e69fc7c29f104a2117a62de11cba9b2a2c740e1 Parents: c570a51 Author: Marcell Hegedus Authored: Wed Jul 19 14:27:56 2017 +0200 Committer: Denes Arvay Committed: Wed Jul 19 14:27:56 2017 +0200 ---------------------------------------------------------------------- .../java/org/apache/flume/channel/file/Put.java | 7 +++- .../flume/channel/file/proto/ProtosFactory.java | 40 ++++++++------------ .../src/main/proto/filechannel.proto | 2 +- .../flume/channel/file/TestFileChannel.java | 24 ++++++++++++ .../apache/flume/channel/TestMemoryChannel.java | 22 +++++++++++ 5 files changed, 68 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java index 0a70a24..c5ea290 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Put.java @@ -82,8 +82,11 @@ class Put extends TransactionEventRecord { for (String key : headers.keySet()) { String value = headers.get(key); headerBuilder.clear(); - eventBuilder.addHeaders(headerBuilder.setKey(key) - .setValue(value).build()); + headerBuilder.setKey(key); + if (value != null) { + headerBuilder.setValue(value); + } + eventBuilder.addHeaders(headerBuilder.build()); } } eventBuilder.setBody(ByteString.copyFrom(event.getBody())); http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java index 50492cc..202f33d 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java @@ -6831,17 +6831,17 @@ public final class ProtosFactory { com.google.protobuf.ByteString getKeyBytes(); - // required string value = 2; + // optional string value = 2; /** - * required string value = 2; + * optional string value = 2; */ boolean hasValue(); /** - * required string value = 2; + * optional string value = 2; */ java.lang.String getValue(); /** - * required string value = 2; + * optional string value = 2; */ com.google.protobuf.ByteString getValueBytes(); @@ -6990,17 +6990,17 @@ public final class ProtosFactory { } } - // required string value = 2; + // optional string value = 2; public static final int VALUE_FIELD_NUMBER = 2; private java.lang.Object value_; /** - * required string value = 2; + * optional string value = 2; */ public boolean hasValue() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** - * required string value = 2; + * optional string value = 2; */ public java.lang.String getValue() { java.lang.Object ref = value_; @@ -7017,7 +7017,7 @@ public final class ProtosFactory { } } /** - * required string value = 2; + * optional string value = 2; */ public com.google.protobuf.ByteString getValueBytes() { @@ -7046,10 +7046,6 @@ public final class ProtosFactory { memoizedIsInitialized = 0; return false; } - if (!hasValue()) { - memoizedIsInitialized = 0; - return false; - } memoizedIsInitialized = 1; return true; } @@ -7271,10 +7267,6 @@ public final class ProtosFactory { return false; } - if (!hasValue()) { - - return false; - } return true; } @@ -7371,16 +7363,16 @@ public final class ProtosFactory { return this; } - // required string value = 2; + // optional string value = 2; private java.lang.Object value_ = ""; /** - * required string value = 2; + * optional string value = 2; */ public boolean hasValue() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** - * required string value = 2; + * optional string value = 2; */ public java.lang.String getValue() { java.lang.Object ref = value_; @@ -7394,7 +7386,7 @@ public final class ProtosFactory { } } /** - * required string value = 2; + * optional string value = 2; */ public com.google.protobuf.ByteString getValueBytes() { @@ -7410,7 +7402,7 @@ public final class ProtosFactory { } } /** - * required string value = 2; + * optional string value = 2; */ public Builder setValue( java.lang.String value) { @@ -7423,7 +7415,7 @@ public final class ProtosFactory { return this; } /** - * required string value = 2; + * optional string value = 2; */ public Builder clearValue() { bitField0_ = (bitField0_ & ~0x00000002); @@ -7432,7 +7424,7 @@ public final class ProtosFactory { return this; } /** - * required string value = 2; + * optional string value = 2; */ public Builder setValueBytes( com.google.protobuf.ByteString value) { @@ -7546,7 +7538,7 @@ public final class ProtosFactory { "ansactionEventFooter\">\n\nFlumeEvent\022\"\n\007he" + "aders\030\001 \003(\0132\021.FlumeEventHeader\022\014\n\004body\030\002", " \002(\014\".\n\020FlumeEventHeader\022\013\n\003key\030\001 \002(\t\022\r\n" + - "\005value\030\002 \002(\tB4\n#org.apache.flume.channel" + + "\005value\030\002 \001(\tB4\n#org.apache.flume.channel" + ".file.protoB\rProtosFactory" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto b/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto index 25520e8..929b41d 100644 --- a/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto +++ b/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto @@ -83,5 +83,5 @@ message FlumeEvent { message FlumeEventHeader { required string key = 1; - required string value = 2; + optional string value = 2; } http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java index 8efe991..a3d27f7 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java @@ -18,6 +18,7 @@ */ package org.apache.flume.channel.file; +import com.google.common.base.Charsets; import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -69,6 +70,7 @@ public class TestFileChannel extends TestFileChannelBase { private static final Logger LOG = LoggerFactory .getLogger(TestFileChannel.class); + public static final String TEST_KEY = "test_key"; @Before public void setup() throws Exception { @@ -234,6 +236,28 @@ public class TestFileChannel extends TestFileChannelBase { } @Test + public void testPutConvertsNullValueToEmptyStrInHeader() throws Exception { + channel.start(); + + Event event = EventBuilder.withBody("test body".getBytes(Charsets.UTF_8), + Collections.singletonMap(TEST_KEY, null)); + + Transaction txPut = channel.getTransaction(); + txPut.begin(); + channel.put(event); + txPut.commit(); + txPut.close(); + + Transaction txTake = channel.getTransaction(); + txTake.begin(); + Event eventTaken = channel.take(); + Assert.assertArrayEquals(event.getBody(), eventTaken.getBody()); + Assert.assertEquals("", eventTaken.getHeaders().get(TEST_KEY)); + txTake.commit(); + txTake.close(); + } + + @Test public void testCommitAfterNoPutTake() throws Exception { channel.start(); Assert.assertTrue(channel.isOpen()); http://git-wip-us.apache.org/repos/asf/flume/blob/1e69fc7c/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java index 344bb58..f7e43eb 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java +++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java @@ -19,6 +19,7 @@ package org.apache.flume.channel; +import com.google.common.base.Charsets; import com.google.common.collect.ImmutableMap; import org.apache.flume.ChannelException; import org.apache.flume.Context; @@ -32,6 +33,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.LinkedBlockingDeque; @@ -72,6 +74,26 @@ public class TestMemoryChannel { } @Test + public void testPutAcceptsNullValueInHeader() { + Configurables.configure(channel, new Context()); + + Event event = EventBuilder.withBody("test body".getBytes(Charsets.UTF_8), + Collections.singletonMap("test_key", null)); + + Transaction txPut = channel.getTransaction(); + txPut.begin(); + channel.put(event); + txPut.commit(); + txPut.close(); + + Transaction txTake = channel.getTransaction(); + txTake.begin(); + Event eventTaken = channel.take(); + Assert.assertEquals(event, eventTaken); + txTake.commit(); + } + + @Test public void testChannelResize() { Context context = new Context(); Map parms = new HashMap();