camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject [1/2] git commit: CAMEL-7080 Add support of lazy load to csv data format with thanks to Daneel
Date Sun, 22 Dec 2013 03:44:47 GMT
Updated Branches:
  refs/heads/master 5e216a0d7 -> 39abbbf1a


CAMEL-7080 Add support of lazy load to csv data format with thanks to Daneel


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/53b6dcc8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/53b6dcc8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/53b6dcc8

Branch: refs/heads/master
Commit: 53b6dcc8adc327d8b6c1682472c5d75c086f6938
Parents: 5e216a0
Author: Willem Jiang <willem.jiang@gmail.com>
Authored: Sun Dec 22 11:16:44 2013 +0800
Committer: Willem Jiang <willem.jiang@gmail.com>
Committed: Sun Dec 22 11:16:44 2013 +0800

----------------------------------------------------------------------
 .../apache/camel/builder/DataFormatClause.java  |   8 ++
 .../camel/model/dataformat/CsvDataFormat.java   |  21 +++-
 components/camel-csv/pom.xml                    |   6 ++
 .../camel/dataformat/csv/CsvDataFormat.java     |  68 +++++++++----
 .../camel/dataformat/csv/CsvIterator.java       |  63 ++++++++++++
 .../camel/dataformat/csv/CsvIteratorTest.java   | 100 +++++++++++++++++++
 .../csv/CsvUnmarshalStreamSpringTest.java       |  56 +++++++++++
 .../dataformat/csv/CsvUnmarshalStreamTest.java  |  79 +++++++++++++++
 .../CsvUnmarshalStreamSpringTest-context.xml    |  32 ++++++
 9 files changed, 410 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/53b6dcc8/camel-core/src/main/java/org/apache/camel/builder/DataFormatClause.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/builder/DataFormatClause.java b/camel-core/src/main/java/org/apache/camel/builder/DataFormatClause.java
index c2ce089..b28d062 100644
--- a/camel-core/src/main/java/org/apache/camel/builder/DataFormatClause.java
+++ b/camel-core/src/main/java/org/apache/camel/builder/DataFormatClause.java
@@ -181,6 +181,14 @@ public class DataFormatClause<T extends ProcessorDefinition<?>>
{
     }
 
     /**
+     * Uses the CSV data format for a huge file.
+     * Sequential access through an iterator.
+     */
+    public T csvLazyLoad() {
+        return dataFormat(new CsvDataFormat(true));
+    }
+
+    /**
      * Uses the custom data format
      */
     public T custom(String ref) {

http://git-wip-us.apache.org/repos/asf/camel/blob/53b6dcc8/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java
b/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java
index 3d5d534..cfd4f6b 100644
--- a/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java
+++ b/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java
@@ -46,6 +46,8 @@ public class CsvDataFormat extends DataFormatDefinition {
     private String strategyRef;
     @XmlAttribute
     private Boolean skipFirstLine;
+    @XmlAttribute
+    private Boolean lazyLoad;
 
     public CsvDataFormat() {
         super("csv");
@@ -56,6 +58,11 @@ public class CsvDataFormat extends DataFormatDefinition {
         setDelimiter(delimiter);
     }
 
+    public CsvDataFormat(boolean lazyLoad) {
+        this();
+        setLazyLoad(lazyLoad);
+    }
+
     public Boolean isAutogenColumns() {
         return autogenColumns;
     }
@@ -96,6 +103,14 @@ public class CsvDataFormat extends DataFormatDefinition {
         this.skipFirstLine = skipFirstLine;
     }
 
+    public Boolean getLazyLoad() {
+        return lazyLoad;
+    }
+
+    public void setLazyLoad(Boolean lazyLoad) {
+        this.lazyLoad = lazyLoad;
+    }
+
     @Override
     protected DataFormat createDataFormat(RouteContext routeContext) {
         DataFormat csvFormat = super.createDataFormat(routeContext);
@@ -131,5 +146,9 @@ public class CsvDataFormat extends DataFormatDefinition {
         if (skipFirstLine != null) {
             setProperty(camelContext, dataFormat, "skipFirstLine", skipFirstLine);
         }
+
+        if (lazyLoad != null) {
+            setProperty(camelContext, dataFormat, "lazyLoad", lazyLoad);
+        }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/53b6dcc8/components/camel-csv/pom.xml
----------------------------------------------------------------------
diff --git a/components/camel-csv/pom.xml b/components/camel-csv/pom.xml
index c55bfde..a265c08 100644
--- a/components/camel-csv/pom.xml
+++ b/components/camel-csv/pom.xml
@@ -57,6 +57,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>com.googlecode.jmockit</groupId>
+      <artifactId>jmockit</artifactId>
+      <version>1.5</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/camel/blob/53b6dcc8/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java
----------------------------------------------------------------------
diff --git a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java
b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java
index 9ad7b53..4ce6e7e 100644
--- a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java
+++ b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java
@@ -16,13 +16,15 @@
  */
 package org.apache.camel.dataformat.csv;
 
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -54,6 +56,10 @@ public class CsvDataFormat implements DataFormat {
     private boolean autogenColumns = true;
     private String delimiter;
     private boolean skipFirstLine;
+    /**
+     * Lazy row loading with iterator for big files.
+     */
+    private boolean lazyLoad;
 
     public void marshal(Exchange exchange, Object object, OutputStream outputStream) throws
Exception {
         if (delimiter != null) {
@@ -96,32 +102,42 @@ public class CsvDataFormat implements DataFormat {
         strategy.setDelimiter(config.getDelimiter());
 
         InputStreamReader in = new InputStreamReader(inputStream, IOHelper.getCharsetName(exchange));
-
+        CsvIterator csvIterator;
         try {
-            CSVParser parser = new CSVParser(in, strategy);
-            List<List<String>> list = new ArrayList<List<String>>();
-            boolean isFirstLine = true;
-            while (true) {
-                String[] strings = parser.getLine();
-                if (isFirstLine) {
-                    isFirstLine = false;
-                    if (skipFirstLine) {
-                        // skip considering the first line if we're asked to do so
-                        continue;
-                    }
-                }
-                if (strings == null) {
-                    break;
-                }
-                List<String> line = Arrays.asList(strings);
-                list.add(line);
+            CSVParser parser = createParser(in);
+            if (parser == null) {
+                IOHelper.close(in);
+                return Collections.emptyIterator();
             }
-            return list;
-        } finally {
+            csvIterator = new CsvIterator(parser, in);
+        } catch (IOException e) {
             IOHelper.close(in);
+            throw e;
+        }
+        if (lazyLoad) {
+            return csvIterator;
         }
+        return loadAllAsList(csvIterator);
     }
-    
+
+    private CSVParser createParser(InputStreamReader in) throws IOException {
+        CSVParser parser = new CSVParser(in, strategy);
+        if (skipFirstLine) {
+            if (null == parser.getLine()) {
+                return null;
+            }
+        }
+        return parser;
+    }
+
+    private List<List<String>> loadAllAsList(CsvIterator iter) throws IOException
{
+        List<List<String>> list = new ArrayList<List<String>>();
+        while (iter.hasNext()) {
+            list.add(iter.next());
+        }
+        return list;
+    }
+
     public String getDelimiter() {
         return delimiter;
     }
@@ -170,6 +186,14 @@ public class CsvDataFormat implements DataFormat {
         this.skipFirstLine = skipFirstLine;
     }
 
+    public boolean isLazyLoad() {
+        return lazyLoad;
+    }
+
+    public void setLazyLoad(boolean lazyLoad) {
+        this.lazyLoad = lazyLoad;
+    }
+
     private synchronized void updateFieldsInConfig(Set<?> set, Exchange exchange) {
         for (Object value : set) {
             if (value != null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/53b6dcc8/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java
----------------------------------------------------------------------
diff --git a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java
b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java
new file mode 100644
index 0000000..68df9c4
--- /dev/null
+++ b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java
@@ -0,0 +1,63 @@
+package org.apache.camel.dataformat.csv;
+
+import org.apache.camel.util.IOHelper;
+import org.apache.commons.csv.CSVParser;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ */
+public class CsvIterator implements Iterator<List<String>>, Closeable {
+
+    private final CSVParser parser;
+    private final InputStreamReader in;
+    private String[] line;
+
+    public CsvIterator(CSVParser parser, InputStreamReader in)
+            throws IOException
+    {
+        this.parser = parser;
+        this.in = in;
+        line = parser.getLine();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return line != null;
+    }
+
+    @Override
+    public List<String> next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+        List<String> result = Arrays.asList(line);
+        try {
+            line = parser.getLine();
+        } catch (IOException e) {
+            line = null;
+            IOHelper.close(in);
+            throw new IllegalStateException(e);
+        }
+        if (line == null) {
+            IOHelper.close(in);
+        }
+        return result;
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/53b6dcc8/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvIteratorTest.java
----------------------------------------------------------------------
diff --git a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvIteratorTest.java
b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvIteratorTest.java
new file mode 100644
index 0000000..db60e9a
--- /dev/null
+++ b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvIteratorTest.java
@@ -0,0 +1,100 @@
+package org.apache.camel.dataformat.csv;
+
+import mockit.Expectations;
+import mockit.Injectable;
+import org.apache.commons.csv.CSVParser;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+
+/**
+ */
+public class CsvIteratorTest {
+
+    public static final String HDD_CRASH = "HDD crash";
+
+    @Test
+    public void closeIfError(
+            final @Injectable InputStreamReader reader,
+            final @Injectable CSVParser parser)
+            throws IOException
+    {
+        new Expectations() {
+            {
+                parser.getLine();
+                result = new String[] { "1" };
+
+                parser.getLine();
+                result = new String[] { "2" };
+
+                parser.getLine();
+                result = new IOException(HDD_CRASH);
+
+                reader.close();
+            }
+        };
+
+        CsvIterator iterator = new CsvIterator(parser, reader);
+        Assert.assertTrue(iterator.hasNext());
+        Assert.assertEquals(Arrays.asList("1"), iterator.next());
+        Assert.assertTrue(iterator.hasNext());
+
+        try {
+            iterator.next();
+            Assert.fail("exception expected");
+        } catch (IllegalStateException e) {
+            Assert.assertEquals(HDD_CRASH, e.getCause().getMessage());
+        }
+
+        Assert.assertFalse(iterator.hasNext());
+
+        try {
+            iterator.next();
+            Assert.fail("exception expected");
+        } catch (NoSuchElementException e) {
+            // okay
+        }
+    }
+
+    @Test
+    public void normalCycle(final @Injectable InputStreamReader reader,
+                            final @Injectable CSVParser parser)
+            throws IOException
+    {
+        new Expectations() {
+            {
+                parser.getLine();
+                result = new String[] { "1" };
+
+                parser.getLine();
+                result = new String[] { "2" };
+
+                parser.getLine();
+                result = null;
+
+                reader.close();
+            }
+        };
+
+        CsvIterator iterator = new CsvIterator(parser, reader);
+        Assert.assertTrue(iterator.hasNext());
+        Assert.assertEquals(Arrays.asList("1"), iterator.next());
+
+        Assert.assertTrue(iterator.hasNext());
+        Assert.assertEquals(Arrays.asList("2"), iterator.next());
+
+        Assert.assertFalse(iterator.hasNext());
+
+        try {
+            iterator.next();
+            Assert.fail("exception expected");
+        } catch (NoSuchElementException e) {
+            // okay
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/53b6dcc8/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest.java
----------------------------------------------------------------------
diff --git a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest.java
b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest.java
new file mode 100644
index 0000000..0727801
--- /dev/null
+++ b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dataformat.csv;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.spring.CamelSpringTestSupport;
+import org.junit.Test;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+public class CsvUnmarshalStreamSpringTest extends CamelSpringTestSupport {
+
+    public static final String MESSAGE = "message";
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint result;
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testCsvUnMarshal() throws Exception {
+        result.expectedMessageCount(1);
+
+        template.sendBody("direct:start", MESSAGE + "\n");
+
+        assertMockEndpointsSatisfied();
+
+        Iterator<String> body = result.getReceivedExchanges().get(0)
+                .getIn().getBody(Iterator.class);
+        assertEquals(CsvIterator.class, body.getClass());
+        assertEquals(Arrays.asList(MESSAGE), body.next());
+    }
+
+    @Override
+    protected AbstractApplicationContext createApplicationContext() {
+        return new ClassPathXmlApplicationContext(
+                "org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest-context.xml");
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/53b6dcc8/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java
----------------------------------------------------------------------
diff --git a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java
b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java
new file mode 100644
index 0000000..21627b2
--- /dev/null
+++ b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dataformat.csv;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Spring based integration test for the <code>CsvDataFormat</code>
+ * @version 
+ */
+public class CsvUnmarshalStreamTest extends CamelTestSupport {
+
+    public static final int EXPECTED_COUNT = 3;
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint result;
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testCsvUnMarshal() throws Exception {
+        result.expectedMessageCount(EXPECTED_COUNT);
+
+        String message = "";
+        for (int i = 0; i < EXPECTED_COUNT; ++i) {
+            message += i + "|\"" + i + "\n" + i + "\"\n";
+        }
+
+        template.sendBody("direct:start", message);
+
+        assertMockEndpointsSatisfied();
+
+        for (int i = 0; i < EXPECTED_COUNT; ++i) {
+            List<String> body = result.getReceivedExchanges().get(i)
+                    .getIn().getBody(List.class);
+            assertEquals(2, body.size());
+            assertEquals(String.valueOf(i), body.get(0));
+            assertEquals(String.format("%d\n%d", i, i), body.get(1));
+        }
+    }
+    
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                CsvDataFormat csv = new CsvDataFormat();
+                csv.setLazyLoad(true);
+                csv.setDelimiter("|");
+
+                from("direct:start")
+                        .unmarshal(csv)
+                        .split(body())
+                        .to("mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/53b6dcc8/components/camel-csv/src/test/resources/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest-context.xml
----------------------------------------------------------------------
diff --git a/components/camel-csv/src/test/resources/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest-context.xml
b/components/camel-csv/src/test/resources/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest-context.xml
new file mode 100644
index 0000000..f510201
--- /dev/null
+++ b/components/camel-csv/src/test/resources/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest-context.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+  <!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements. See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version
+    2.0 (the "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+    applicable law or agreed to in writing, software distributed under
+    the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+    OR CONDITIONS OF ANY KIND, either express or implied. See the
+    License for the specific language governing permissions and
+    limitations under the License.
+  -->
+<beans xmlns="http://www.springframework.org/schema/beans"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="
+    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+    http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
+  <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+    <route>
+      <from uri="direct:start" />
+      <unmarshal>
+        <csv delimiter="|" lazyLoad="true"/>
+      </unmarshal>
+      <to uri="mock:result" />
+    </route>
+  </camelContext>
+</beans>


Mime
View raw message