camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [5/6] git commit: CAMEL-6899: camel-stream has headers with index/complete flag so you know this information, such as its the last, and there was N number of lines.
Date Tue, 12 Nov 2013 10:01:11 GMT
CAMEL-6899: camel-stream has headers with index/complete flag so you know this information,
such as its the last, and there was N number of lines.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/79739987
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/79739987
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/79739987

Branch: refs/heads/camel-2.12.x
Commit: 797399877f549d1d4edd8cfc8a541b9334cea08a
Parents: ac70d07
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Tue Nov 12 11:01:33 2013 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Tue Nov 12 11:01:58 2013 +0100

----------------------------------------------------------------------
 .../camel/component/stream/StreamConstants.java | 26 +++++++++++
 .../camel/component/stream/StreamConsumer.java  | 45 ++++++++++----------
 .../camel/component/stream/StreamEndpoint.java  |  9 ++++
 .../component/stream/ScanStreamFileTest.java    |  5 +++
 .../StreamGroupLinesLastStrategyTest.java       |  4 ++
 .../stream/StreamGroupLinesStrategyTest.java    |  4 ++
 .../component/stream/StreamGroupLinesTest.java  |  4 ++
 7 files changed, 75 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/79739987/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConstants.java
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConstants.java
new file mode 100644
index 0000000..b338c43
--- /dev/null
+++ b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConstants.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.stream;
+
+public final class StreamConstants {
+
+    public static final String STREAM_INDEX = "CamelStreamIndex";
+    public static final String STREAM_COMPLETE = "CamelStreamComplete";
+
+    private StreamConstants() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/79739987/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
index 1e8f9a9..ed33a0d 100644
--- a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
+++ b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamConsumer.java
@@ -32,10 +32,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
-import org.apache.camel.impl.DefaultMessage;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
 
@@ -122,6 +120,7 @@ public class StreamConsumer extends DefaultConsumer implements Runnable
{
     }
 
     private void readFromStream() throws Exception {
+        long index = 0;
         String line;
         BufferedReader br = initializeStream();
 
@@ -132,7 +131,7 @@ public class StreamConsumer extends DefaultConsumer implements Runnable
{
                 LOG.trace("Read line: {}", line);
                 boolean eos = line == null;
                 if (!eos && isRunAllowed()) {
-                    processLine(line);
+                    index = processLine(line, false, index);
                 } else if (eos && isRunAllowed() && endpoint.isRetry()) {
                     //try and re-open stream
                     br = initializeStream();
@@ -147,20 +146,29 @@ public class StreamConsumer extends DefaultConsumer implements Runnable
{
         } else {
             // regular read stream once until end of stream
             boolean eos = false;
+            String line2 = null;
             while (!eos && isRunAllowed()) {
                 if (endpoint.getPromptMessage() != null) {
                     doPromptMessage();
                 }
 
-                line = br.readLine();
+                if (line2 == null) {
+                    line = br.readLine();
+                } else {
+                    line = line2;
+                }
                 LOG.trace("Read line: {}", line);
+
                 eos = line == null;
                 if (!eos && isRunAllowed()) {
-                    processLine(line);
+                    // read ahead if there is more data
+                    line2 = br.readLine();
+                    boolean last = line2 == null;
+                    index = processLine(line, last, index);
                 }
             }
             // EOL so trigger any
-            processLine(null);
+            processLine(null, true, index);
         }
         // important: do not close the reader as it will close the standard system.in etc.
     }
@@ -168,9 +176,7 @@ public class StreamConsumer extends DefaultConsumer implements Runnable
{
     /**
      * Strategy method for processing the line
      */
-    protected synchronized void processLine(String line) throws Exception {
-        boolean last = line == null;
-
+    protected synchronized long processLine(String line, boolean last, long index) throws
Exception {
         if (endpoint.getGroupLines() > 0) {
             // remember line
             if (line != null) {
@@ -180,29 +186,24 @@ public class StreamConsumer extends DefaultConsumer implements Runnable
{
             // should we flush lines?
             if (!lines.isEmpty() && (lines.size() >= endpoint.getGroupLines()
|| last)) {
                 // spit out lines as we hit the size, or it was the last
-                Exchange exchange = endpoint.createExchange();
-
-                // create message with the lines
-                Message msg = new DefaultMessage();
                 List<String> copy = new ArrayList<String>(lines);
-                msg.setBody(endpoint.getGroupStrategy().groupLines(copy));
-                exchange.setIn(msg);
+                Object body = endpoint.getGroupStrategy().groupLines(copy);
+                // remember to inc index when we create an exchange
+                Exchange exchange = endpoint.createExchange(body, index++, last);
 
                 // clear lines
                 lines.clear();
 
                 getProcessor().process(exchange);
             }
-        } else if (!last) {
+        } else if (line != null) {
             // single line
-            Exchange exchange = endpoint.createExchange();
-
-            Message msg = new DefaultMessage();
-            msg.setBody(line);
-            exchange.setIn(msg);
-
+            // remember to inc index when we create an exchange
+            Exchange exchange = endpoint.createExchange(line, index++, last);
             getProcessor().process(exchange);
         }
+
+        return index;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/79739987/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
index c091e65..9fc46f3 100644
--- a/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
+++ b/components/camel-stream/src/main/java/org/apache/camel/component/stream/StreamEndpoint.java
@@ -20,6 +20,7 @@ import java.nio.charset.Charset;
 
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
@@ -70,6 +71,14 @@ public class StreamEndpoint extends DefaultEndpoint {
         return true;
     }
 
+    protected Exchange createExchange(Object body, long index, boolean last) {
+        Exchange exchange = createExchange();
+        exchange.getIn().setBody(body);
+        exchange.getIn().setHeader(StreamConstants.STREAM_INDEX, index);
+        exchange.getIn().setHeader(StreamConstants.STREAM_COMPLETE, last);
+        return exchange;
+    }
+
     // Properties
     //-------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/camel/blob/79739987/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java
----------------------------------------------------------------------
diff --git a/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java
b/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java
index 2f7e89b..60096f0 100644
--- a/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java
+++ b/components/camel-stream/src/test/java/org/apache/camel/component/stream/ScanStreamFileTest.java
@@ -48,6 +48,11 @@ public class ScanStreamFileTest extends CamelTestSupport {
     public void testScanFile() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMinimumMessageCount(2);
+        mock.message(0).header(StreamConstants.STREAM_INDEX).isEqualTo(0);
+        mock.message(0).header(StreamConstants.STREAM_COMPLETE).isEqualTo(false);
+        mock.message(1).header(StreamConstants.STREAM_INDEX).isEqualTo(1);
+        // a scanStream=true is never finished
+        mock.message(1).header(StreamConstants.STREAM_COMPLETE).isEqualTo(false);
 
         FileOutputStream fos = new FileOutputStream(file);
         try {

http://git-wip-us.apache.org/repos/asf/camel/blob/79739987/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesLastStrategyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesLastStrategyTest.java
b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesLastStrategyTest.java
index 59ad264..279bae0 100644
--- a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesLastStrategyTest.java
+++ b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesLastStrategyTest.java
@@ -27,6 +27,10 @@ public class StreamGroupLinesLastStrategyTest extends StreamGroupLinesStrategyTe
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(2);
         mock.setAssertPeriod(1000);
+        mock.message(0).header(StreamConstants.STREAM_INDEX).isEqualTo(0);
+        mock.message(0).header(StreamConstants.STREAM_COMPLETE).isEqualTo(false);
+        mock.message(1).header(StreamConstants.STREAM_INDEX).isEqualTo(1);
+        mock.message(1).header(StreamConstants.STREAM_COMPLETE).isEqualTo(true);
 
         assertMockEndpointsSatisfied();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/79739987/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java
b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java
index 047319e..4352e13 100644
--- a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java
+++ b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesStrategyTest.java
@@ -49,6 +49,10 @@ public class StreamGroupLinesStrategyTest extends StreamGroupLinesTest
{
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(2);
         mock.setAssertPeriod(1000);
+        mock.message(0).header(StreamConstants.STREAM_INDEX).isEqualTo(0);
+        mock.message(0).header(StreamConstants.STREAM_COMPLETE).isEqualTo(false);
+        mock.message(1).header(StreamConstants.STREAM_INDEX).isEqualTo(1);
+        mock.message(1).header(StreamConstants.STREAM_COMPLETE).isEqualTo(true);
 
         assertMockEndpointsSatisfied();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/79739987/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesTest.java
----------------------------------------------------------------------
diff --git a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesTest.java
b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesTest.java
index 09d92a6..8fe19fc 100644
--- a/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesTest.java
+++ b/components/camel-stream/src/test/java/org/apache/camel/component/stream/StreamGroupLinesTest.java
@@ -60,6 +60,10 @@ public class StreamGroupLinesTest extends CamelTestSupport {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(2);
         mock.setAssertPeriod(1000);
+        mock.message(0).header(StreamConstants.STREAM_INDEX).isEqualTo(0);
+        mock.message(0).header(StreamConstants.STREAM_COMPLETE).isEqualTo(false);
+        mock.message(1).header(StreamConstants.STREAM_INDEX).isEqualTo(1);
+        mock.message(1).header(StreamConstants.STREAM_COMPLETE).isEqualTo(true);
 
         assertMockEndpointsSatisfied();
 


Mime
View raw message