camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject [1/2] git commit: CAMEL-7632 Add streaming mode to SplunkConsumer
Date Fri, 25 Jul 2014 03:11:38 GMT
Repository: camel
Updated Branches:
  refs/heads/master 7f43263d8 -> cfd8ad0ba


CAMEL-7632 Add streaming mode to SplunkConsumer


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c0408636
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c0408636
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c0408636

Branch: refs/heads/master
Commit: c040863682526bfcc46ae38c4b6d8f012922ff08
Parents: 7f43263
Author: dmitrimedvedev <dmitri.medvedev@gmail.com>
Authored: Tue Jul 22 14:33:27 2014 -0700
Committer: Willem Jiang <willem.jiang@gmail.com>
Committed: Fri Jul 25 10:56:14 2014 +0800

----------------------------------------------------------------------
 .../component/splunk/SplunkConfiguration.java   | 26 +++++++++++++
 .../camel/component/splunk/SplunkConsumer.java  | 39 +++++++++++++++++--
 .../splunk/support/SplunkDataReader.java        | 41 +++++++++++++-------
 .../splunk/support/SplunkResultProcessor.java   | 26 +++++++++++++
 4 files changed, 115 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c0408636/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java
index dc4900c..4a0fcff 100644
--- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConfiguration.java
@@ -67,6 +67,12 @@ public class SplunkConfiguration {
     private String initEarliestTime;
     private SplunkConnectionFactory connectionFactory;
 
+    /**
+     * Streaming mode sends exchanges as they are received, rather than in a batch
+     */
+    @UriParam
+    private Boolean streaming;
+
     public String getInitEarliestTime() {
         return initEarliestTime;
     }
@@ -195,6 +201,26 @@ public class SplunkConfiguration {
         this.password = password;
     }
 
+    /**
+     * Returns streaming mode.
+     * <p>
+     * Streaming mode sends exchanges as they are received, rather than in a batch.
+     */
+    public boolean isStreaming() {
+        return streaming != null ? streaming : false;
+    }
+    
+    /**
+     * Sets streaming mode.
+     * <p>
+     * Streaming mode sends exchanges as they are received, rather than in a batch.
+     *  
+     * @param streaming
+     */
+    public void setStreaming(boolean streaming) {
+        this.streaming = streaming;
+    }
+    
     public int getConnectionTimeout() {
         return connectionTimeout;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/c0408636/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
index 0a71877..2929d92 100644
--- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/SplunkConsumer.java
@@ -20,11 +20,13 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.component.splunk.event.SplunkEvent;
 import org.apache.camel.component.splunk.support.SplunkDataReader;
+import org.apache.camel.component.splunk.support.SplunkResultProcessor;
 import org.apache.camel.impl.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
@@ -56,9 +58,40 @@ public class SplunkConsumer extends ScheduledBatchPollingConsumer {
     @Override
     protected int poll() throws Exception {
         try {
-            List<SplunkEvent> events = dataReader.read();
-            Queue<Exchange> exchanges = createExchanges(events);
-            return processBatch(CastUtils.cast(exchanges));
+            if (endpoint.getConfiguration().isStreaming()) {
+                dataReader.read(new SplunkResultProcessor() {
+
+                    @Override
+                    public void process(SplunkEvent splunkEvent) {
+                        final Exchange exchange = getEndpoint().createExchange();
+                        Message message = exchange.getIn();
+                        message.setBody(splunkEvent);
+
+                        try {
+                            LOG.trace("Processing exchange [{}]...", exchange);
+                            getAsyncProcessor().process(exchange, new AsyncCallback() {
+                                @Override
+                                public void done(boolean doneSync) {
+                                    LOG.trace("Done processing exchange [{}]...", exchange);
+                                }
+                            });
+                        } catch (Exception e) {
+                            exchange.setException(e);
+                        }
+                        if (exchange.getException() != null) {
+                            getExceptionHandler().handleException("Error processing exchange",
exchange, exchange.getException());
+                        }
+                        
+                    }
+
+                });
+                // Return 0: no exchanges returned by poll, as exchanges have been returned
asynchronously
+                return 0;
+            } else {
+                List<SplunkEvent> events = dataReader.read();
+                Queue<Exchange> exchanges = createExchanges(events);
+                return processBatch(CastUtils.cast(exchanges));
+            }
         } catch (Exception e) {
             endpoint.reset(e);
             getExceptionHandler().handleException(e);

http://git-wip-us.apache.org/repos/asf/camel/blob/c0408636/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataReader.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataReader.java
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataReader.java
index 78715a4..fe0f74b 100644
--- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataReader.java
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataReader.java
@@ -85,15 +85,20 @@ public class SplunkDataReader {
     }
 
     public List<SplunkEvent> read() throws Exception {
+        // Read without callback
+        return read(null);
+    }
+
+    public List<SplunkEvent> read(SplunkResultProcessor callback) throws Exception
{
         switch (consumerType) {
         case NORMAL: {
-            return nonBlockingSearch();
+            return nonBlockingSearch(callback);
         }
         case REALTIME: {
-            return realtimeSearch();
+            return realtimeSearch(callback);
         }
         case SAVEDSEARCH: {
-            return savedSearch();
+            return savedSearch(callback);
         }
         default: {
             throw new RuntimeException("Unknown search mode " + consumerType);
@@ -182,7 +187,7 @@ public class SplunkDataReader {
         return eTime;
     }
 
-    private List<SplunkEvent> savedSearch() throws Exception {
+    private List<SplunkEvent> savedSearch(SplunkResultProcessor callback) throws Exception
{
         LOG.trace("saved search start");
 
         ServiceArgs queryArgs = new ServiceArgs();
@@ -219,13 +224,13 @@ public class SplunkDataReader {
         while (!job.isDone()) {
             Thread.sleep(2000);
         }
-        List<SplunkEvent> data = extractData(job, false);
+        List<SplunkEvent> data = extractData(job, false, callback);
         this.lastSuccessfulReadTime = startTime;
         return data;
 
     }
 
-    private List<SplunkEvent> nonBlockingSearch() throws Exception {
+    private List<SplunkEvent> nonBlockingSearch(SplunkResultProcessor callback) throws
Exception {
         LOG.debug("non block search start");
 
         JobArgs queryArgs = new JobArgs();
@@ -233,12 +238,12 @@ public class SplunkDataReader {
         Calendar startTime = Calendar.getInstance();
         populateArgs(queryArgs, startTime, false);
 
-        List<SplunkEvent> data = runQuery(queryArgs, false);
+        List<SplunkEvent> data = runQuery(queryArgs, false, callback);
         lastSuccessfulReadTime = startTime;
         return data;
     }
 
-    private List<SplunkEvent> realtimeSearch() throws Exception {
+    private List<SplunkEvent> realtimeSearch(SplunkResultProcessor callback) throws
Exception {
         LOG.debug("realtime search start");
 
         JobArgs queryArgs = new JobArgs();
@@ -247,12 +252,12 @@ public class SplunkDataReader {
         Calendar startTime = Calendar.getInstance();
         populateArgs(queryArgs, startTime, true);
 
-        List<SplunkEvent> data = runQuery(queryArgs, true);
+        List<SplunkEvent> data = runQuery(queryArgs, true, callback);
         lastSuccessfulReadTime = startTime;
         return data;
     }
 
-    private List<SplunkEvent> runQuery(JobArgs queryArgs, boolean realtime) throws
Exception {
+    private List<SplunkEvent> runQuery(JobArgs queryArgs, boolean realtime, SplunkResultProcessor
callback) throws Exception {
         Service service = endpoint.getService();
         Job job = service.getJobs().create(getSearch(), queryArgs);
         LOG.debug("Running search : {} with queryArgs : {}", getSearch(), queryArgs);
@@ -270,10 +275,10 @@ public class SplunkDataReader {
                 Thread.sleep(500);
             }
         }
-        return extractData(job, realtime);
+        return extractData(job, realtime, callback);
     }
 
-    private List<SplunkEvent> extractData(Job job, boolean realtime) throws Exception
{
+    private List<SplunkEvent> extractData(Job job, boolean realtime, SplunkResultProcessor
callback) throws Exception {
         List<SplunkEvent> result = new ArrayList<SplunkEvent>();
         HashMap<String, String> data;
         SplunkEvent splunkData;
@@ -300,7 +305,11 @@ public class SplunkDataReader {
             resultsReader = new ResultsReaderJson(stream);
             while ((data = resultsReader.getNextEvent()) != null) {
                 splunkData = new SplunkEvent(data);
-                result.add(splunkData);
+                if (callback != null) {
+                    callback.process(splunkData);
+                } else {
+                    result.add(splunkData);
+                }
             }
             IOHelper.close(stream);
         } else {
@@ -319,7 +328,11 @@ public class SplunkDataReader {
                 resultsReader = new ResultsReaderJson(stream);
                 while ((data = resultsReader.getNextEvent()) != null) {
                     splunkData = new SplunkEvent(data);
-                    result.add(splunkData);
+                    if (callback != null) {
+                        callback.process(splunkData);
+                    } else {
+                        result.add(splunkData);
+                    }
                 }
                 offset += getCount();
                 IOHelper.close(stream);

http://git-wip-us.apache.org/repos/asf/camel/blob/c0408636/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkResultProcessor.java
----------------------------------------------------------------------
diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkResultProcessor.java
b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkResultProcessor.java
new file mode 100644
index 0000000..46ea42c
--- /dev/null
+++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkResultProcessor.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.splunk.support;
+
+import org.apache.camel.component.splunk.event.SplunkEvent;
+
+/**
+ * Processes splunk results
+ */
+public interface SplunkResultProcessor {
+    public void process(SplunkEvent splunkData);
+}


Mime
View raw message