flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hshreedha...@apache.org
Subject git commit: FLUME-2426. Support interceptors in the Embedded Agent
Date Fri, 12 Sep 2014 18:51:45 GMT
Repository: flume
Updated Branches:
  refs/heads/flume-1.6 49f8eb2a3 -> c458a838a


FLUME-2426. Support interceptors in the Embedded Agent

(Johny Rufus via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/c458a838
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/c458a838
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/c458a838

Branch: refs/heads/flume-1.6
Commit: c458a838ad3dc68946662050ee96353d36ee4e74
Parents: 49f8eb2
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Fri Sep 12 11:50:57 2014 -0700
Committer: Hari Shreedharan <hshreedharan@apache.org>
Committed: Fri Sep 12 11:51:31 2014 -0700

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeDeveloperGuide.rst     |  8 ++++++-
 .../flume/agent/embedded/EmbeddedAgent.java     |  4 +++-
 .../flume/agent/embedded/TestEmbeddedAgent.java | 24 +++++++++++++++++++-
 .../TestEmbeddedAgentConfiguration.java         |  4 ++++
 4 files changed, 37 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/c458a838/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
index ec6a735..e3b60e6 100644
--- a/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeDeveloperGuide.rst
@@ -450,7 +450,7 @@ sources, sinks, and channels are allowed. Specifically the source used
 is a special embedded source and events should be send to the source
 via the put, putAll methods on the EmbeddedAgent object. Only File Channel
 and Memory Channel are allowed as channels while Avro Sink is the only
-supported sink.
+supported sink. Interceptors are also supported by the embedded agent.
 
 Note: The embedded agent has a dependency on hadoop-core.jar.
 
@@ -470,6 +470,8 @@ channel.*             --                Configuration options for the
channel ty
 sink.*                --                Configuration options for the sink. See AvroSink
user guide for an exhaustive list, however note AvroSink requires at least hostname and port.
 **processor.type**    --                Either ``failover`` or ``load_balance`` which correspond
to FailoverSinksProcessor and LoadBalancingSinkProcessor respectively.
 processor.*           --                Configuration options for the sink processor selected.
See FailoverSinksProcessor and LoadBalancingSinkProcessor user guide for an exhaustive list.
+source.interceptors   --                Space-separated list of interceptors
+source.interceptors.* --                Configuration options for individual interceptors
specified in the source.interceptors property
 ====================  ================  ==============================================
 
 Below is an example of how to use the agent:
@@ -487,6 +489,10 @@ Below is an example of how to use the agent:
     properties.put("sink2.hostname", "collector2.apache.org");
     properties.put("sink2.port",  "5565");
     properties.put("processor.type", "load_balance");
+    properties.put("source.interceptors", "i1");
+    properties.put("source.interceptors.i1.type", "static");
+    properties.put("source.interceptors.i1.key", "key1");
+    properties.put("source.interceptors.i1.value", "value1");
 
     EmbeddedAgent agent = new EmbeddedAgent("myagent");
 

http://git-wip-us.apache.org/repos/asf/flume/blob/c458a838/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
----------------------------------------------------------------------
diff --git a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
index d02f440..32c9f18 100644
--- a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
+++ b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
@@ -111,7 +111,8 @@ public class EmbeddedAgent {
       throw new IllegalStateException("Cannot be started before being " +
           "configured");
     }
-    doStart();
+    // This check needs to be done before doStart(),
+    // as doStart() accesses sourceRunner.getSource()
     Source source = Preconditions.checkNotNull(sourceRunner.getSource(),
         "Source runner returned null source");
     if(source instanceof EmbeddedSource) {
@@ -120,6 +121,7 @@ public class EmbeddedAgent {
       throw new IllegalStateException("Unknown source type: " + source.
           getClass().getName());
     }
+    doStart();
     state = State.STARTED;
   }
   /**

http://git-wip-us.apache.org/repos/asf/flume/blob/c458a838/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java
----------------------------------------------------------------------
diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java
b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java
index 0d644c6..975ba8d 100644
--- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java
+++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java
@@ -61,7 +61,6 @@ public class TestEmbeddedAgent {
   private Map<String, String> headers;
   private byte[] body;
 
-
   @Before
   public void setUp() throws Exception {
     headers = Maps.newHashMap();
@@ -93,6 +92,7 @@ public class TestEmbeddedAgent {
 
     agent = new EmbeddedAgent("test-" + serialNumber.incrementAndGet());
   }
+
   @After
   public void tearDown() throws Exception {
     if(agent != null) {
@@ -110,6 +110,7 @@ public class TestEmbeddedAgent {
       }
     }
   }
+
   @Test(timeout = 30000L)
   public void testPut() throws Exception {
     agent.configure(properties);
@@ -124,6 +125,7 @@ public class TestEmbeddedAgent {
     Assert.assertArrayEquals(body, event.getBody());
     Assert.assertEquals(headers, event.getHeaders());
   }
+
   @Test(timeout = 30000L)
   public void testPutAll() throws Exception {
     List<Event> events = Lists.newArrayList();
@@ -141,7 +143,27 @@ public class TestEmbeddedAgent {
     Assert.assertEquals(headers, event.getHeaders());
   }
 
+  @Test(timeout = 30000L)
+  public void testPutWithInterceptors() throws Exception {
+    properties.put("source.interceptors", "i1");
+    properties.put("source.interceptors.i1.type", "static");
+    properties.put("source.interceptors.i1.key", "key2");
+    properties.put("source.interceptors.i1.value", "value2");
+
+    agent.configure(properties);
+    agent.start();
+    agent.put(EventBuilder.withBody(body, headers));
 
+    Event event;
+    while((event = eventCollector.poll()) == null) {
+      Thread.sleep(500L);
+    }
+    Assert.assertNotNull(event);
+    Assert.assertArrayEquals(body, event.getBody());
+    Map<String, String> newHeaders = new HashMap<String, String>(headers);
+    newHeaders.put("key2", "value2");
+    Assert.assertEquals(newHeaders, event.getHeaders());
+  }
 
   static class EventCollector implements AvroSourceProtocol {
     private final Queue<AvroFlumeEvent> eventQueue =

http://git-wip-us.apache.org/repos/asf/flume/blob/c458a838/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java
b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java
index f70d0b1..f4a9a58 100644
--- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java
+++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java
@@ -46,6 +46,8 @@ public class TestEmbeddedAgentConfiguration {
     properties.put("sink2.hostname", "sink2.host");
     properties.put("sink2.port", "2");
     properties.put("processor.type", "load_balance");
+    properties.put("source.interceptors", "i1");
+    properties.put("source.interceptors.i1.type", "timestamp");
   }
 
 
@@ -91,6 +93,8 @@ public class TestEmbeddedAgentConfiguration {
     expected.put("test1.sources.source-test1.channels", "channel-test1");
     expected.put("test1.sources.source-test1.type", EmbeddedAgentConfiguration.
         SOURCE_TYPE_EMBEDDED);
+    expected.put("test1.sources.source-test1.interceptors", "i1");
+    expected.put("test1.sources.source-test1.interceptors.i1.type", "timestamp");
     Assert.assertEquals(expected, actual);
   }
 


Mime
View raw message