camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject [2/3] git commit: CAMEL-7101 Add aggregation strategy to aggregate multiple messages into a zip file with thanks to Pontus
Date Mon, 30 Dec 2013 02:40:47 GMT
CAMEL-7101 Add aggregation strategy to aggregate multiple messages into a zip file with thanks
to Pontus


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4ea96a15
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4ea96a15
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4ea96a15

Branch: refs/heads/master
Commit: 4ea96a15434920069c6aba39d6bafed42a998276
Parents: d3e7083
Author: Willem Jiang <willem.jiang@gmail.com>
Authored: Mon Dec 30 10:37:17 2013 +0800
Committer: Willem Jiang <willem.jiang@gmail.com>
Committed: Mon Dec 30 10:37:17 2013 +0800

----------------------------------------------------------------------
 .../zipfile/ZipAggregationStrategy.java         | 222 +++++++++++++++++++
 .../zipfile/ZipAggregationStrategyTest.java     |  79 +++++++
 .../camel/aggregate/zipfile/data/chiau.txt      |   1 +
 .../apache/camel/aggregate/zipfile/data/hi.txt  |   1 +
 .../camel/aggregate/zipfile/data/hola.txt       |   1 +
 5 files changed, 304 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4ea96a15/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java
b/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java
new file mode 100644
index 0000000..a028614
--- /dev/null
+++ b/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.zipfile;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.file.FileConsumer;
+import org.apache.camel.component.file.GenericFile;
+import org.apache.camel.component.file.GenericFileMessage;
+import org.apache.camel.component.file.GenericFileOperationFailedException;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.util.FileUtil;
+
+/**
+ * This aggregation strategy will aggregate all incoming messages into a ZIP file.
+ * <p>If the incoming exchanges contain {@link GenericFileMessage} file name will 
+ * be taken from the body otherwise the body content will be treated as a byte 
+ * array and the ZIP entry will be named using the message id.</p>
+ * <p><b>Note:</b> Please note that this aggregation strategy requires
eager 
+ * completion check to work properly.</p>
+ * 
+ */
+public class ZipAggregationStrategy implements AggregationStrategy {
+
+    private String filePrefix;
+    private String fileSuffix = ".zip";
+
+    /**
+     * Gets the prefix used when creating the ZIP file name.
+     * @return the prefix
+     */
+    public String getFilePrefix() {
+        return filePrefix;
+    }
+
+    /**
+     * Sets the prefix that will be used when creating the ZIP filename.
+     * @param filePrefix prefix to use on ZIP file.
+     */
+    public void setFilePrefix(String filePrefix) {
+        this.filePrefix = filePrefix;
+    }
+    
+    /**
+     * Gets the suffix used when creating the ZIP file name.
+     * @return the suffix
+     */
+    public String getFileSuffix() {
+        return fileSuffix;
+    }
+    /**
+     * Sets the suffix that will be used when creating the ZIP filename.
+     * @param fileSuffix suffix to use on ZIP file.
+     */
+    public void setFileSuffix(String fileSuffix) {
+        this.fileSuffix = fileSuffix;
+    }
+
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        File zipFile;
+        Exchange answer = oldExchange;
+        
+        // Guard against empty new exchanges
+        if (newExchange == null) {
+            return oldExchange;
+        }
+    
+        // First time for this aggregation
+        if (oldExchange == null) {
+            try {
+                zipFile = FileUtil.createTempFile(this.filePrefix, this.fileSuffix);
+            } catch (IOException e) {
+                throw new GenericFileOperationFailedException(e.getMessage(), e);
+            }
+            DefaultEndpoint endpoint = (DefaultEndpoint) newExchange.getFromEndpoint();
+            answer = endpoint.createExchange();
+            answer.addOnCompletion(new DeleteZipFileOnCompletion(zipFile));
+        } else {
+            zipFile = oldExchange.getIn().getBody(File.class);
+        }
+        
+        // Handle GenericFileMessages
+        if (GenericFileMessage.class.isAssignableFrom(newExchange.getIn().getClass())) {
+            try {
+                File appendFile =  newExchange.getIn().getBody(File.class);
+                if (appendFile != null) {
+                    addFilesToZip(zipFile, new File[]{appendFile});
+                    GenericFile<File> genericFile = 
+                        FileConsumer.asGenericFile(
+                            zipFile.getParent(), 
+                            zipFile, 
+                            Charset.defaultCharset().toString());
+                    genericFile.bindToExchange(answer);
+                } else {
+                    throw new GenericFileOperationFailedException("Could not get body as
file.");
+                }
+            } catch (IOException e) {
+                throw new GenericFileOperationFailedException(e.getMessage(), e);
+            }
+        } else {
+            // Handle all other messages
+            byte[] buffer = newExchange.getIn().getBody(byte[].class);
+            try {
+                addEntryToZip(zipFile, newExchange.getIn().getMessageId(), buffer, buffer.length);
+                GenericFile<File> genericFile = FileConsumer.asGenericFile(
+                    zipFile.getParent(), zipFile, Charset.defaultCharset().toString());
+                genericFile.bindToExchange(answer);
+            } catch (IOException e) {
+                throw new GenericFileOperationFailedException(e.getMessage(), e);
+            }
+        }
+        
+        return answer;
+    }
+    
+    private static void addFilesToZip(File source, File[] files) throws IOException {
+        File tmpZip = File.createTempFile(source.getName(), null);
+        tmpZip.delete();
+        if (!source.renameTo(tmpZip)) {
+            throw new IOException("Could not make temp file (" + source.getName() + ")");
+        }
+        byte[] buffer = new byte[1024];
+        ZipInputStream zin = new ZipInputStream(new FileInputStream(tmpZip));
+        ZipOutputStream out = new ZipOutputStream(new FileOutputStream(source));
+
+        for (int i = 0; i < files.length; i++) {
+            InputStream in = new FileInputStream(files[i]);
+            out.putNextEntry(new ZipEntry(files[i].getName()));
+            for (int read = in.read(buffer); read > -1; read = in.read(buffer)) {
+                out.write(buffer, 0, read);
+            }
+            out.closeEntry();
+            in.close();
+        }
+
+        for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) {
+            out.putNextEntry(ze);
+            for (int read = zin.read(buffer); read > -1; read = zin.read(buffer)) {
+                out.write(buffer, 0, read);
+            }
+            out.closeEntry();
+        }
+        zin.close();
+        out.close();
+        tmpZip.delete();
+    }
+    
+    private static void addEntryToZip(File source, String entryName, byte[] buffer, int length)
throws IOException {
+
+        File tmpZip = File.createTempFile(source.getName(), null);
+        tmpZip.delete();
+        if (!source.renameTo(tmpZip)) {
+            throw new IOException("Could not make temp file (" + source.getName() + ")");
+        }
+        ZipInputStream zin = new ZipInputStream(new FileInputStream(tmpZip));
+        ZipOutputStream out = new ZipOutputStream(new FileOutputStream(source));
+        
+        out.putNextEntry(new ZipEntry(entryName));
+        out.write(buffer, 0, length);
+        out.closeEntry();
+
+        for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) {
+            out.putNextEntry(ze);
+            for (int read = zin.read(buffer); read > -1; read = zin.read(buffer)) {
+                out.write(buffer, 0, read);
+            }
+            out.closeEntry();
+        }
+        zin.close();
+        out.close();
+        tmpZip.delete();
+    }
+    
+    /**
+     * This callback class is used to clean up the temporary ZIP file once the exchange has
completed.
+     *
+     */
+    private class DeleteZipFileOnCompletion implements Synchronization {
+        
+        private File fileToDelete;
+        
+        public DeleteZipFileOnCompletion(File fileToDelete) {
+            this.fileToDelete = fileToDelete;
+        }
+        
+        @Override
+        public void onFailure(Exchange exchange) {
+            // Keep the file if somthing gone a miss.
+        }
+        
+        @Override
+        public void onComplete(Exchange exchange) {
+            FileUtil.deleteFile(this.fileToDelete);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/4ea96a15/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyTest.java
b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyTest.java
new file mode 100644
index 0000000..be473fe
--- /dev/null
+++ b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.zipfile;
+
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.file.GenericFileMessage;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class ZipAggregationStrategyTest extends CamelTestSupport {
+
+    private static final int EXPECTED_NO_FILES = 3;
+
+    @Test
+    public void testSplitter() throws Exception {
+        MockEndpoint aggregateToZipEntry = getMockEndpoint("mock:aggregateToZipEntry");
+        aggregateToZipEntry.expectedMessageCount(1);
+        assertMockEndpointsSatisfied();
+
+        Exchange out = aggregateToZipEntry.getExchanges().get(0);
+        assertTrue("Result message does not contain GenericFileMessage", GenericFileMessage.class.isAssignableFrom(out.getIn().getClass()));
+        File resultFile = out.getIn().getBody(File.class);
+        assertNotNull(resultFile);
+        assertTrue("Zip file should exist", resultFile.isFile());
+        assertTrue("Result file name does not end with .zip", resultFile.getName().endsWith(".zip"));
+
+        ZipInputStream zin = new ZipInputStream(new FileInputStream(resultFile));
+        try {
+            int fileCount = 0;
+            for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) {
+                fileCount++;
+            }
+            assertTrue("Zip file should contains " + ZipAggregationStrategyTest.EXPECTED_NO_FILES
+ " files",
+                       fileCount == ZipAggregationStrategyTest.EXPECTED_NO_FILES);
+        } finally {
+            zin.close();
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // Unzip file and Split it according to FileEntry
+                from("file:src/test/resources/org/apache/camel/aggregate/zipfile/data?consumer.delay=1000&noop=true")
+                    .aggregate(new ZipAggregationStrategy())
+                        .constant(true)
+                        .completionFromBatchConsumer()
+                        .eagerCheckCompletion()
+                    .to("mock:aggregateToZipEntry")
+                    .log("Done processing big file: ${header.CamelFileName}");
+            }
+        };
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/4ea96a15/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/chiau.txt
----------------------------------------------------------------------
diff --git a/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/chiau.txt
b/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/chiau.txt
new file mode 100644
index 0000000..7842486
--- /dev/null
+++ b/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/chiau.txt
@@ -0,0 +1 @@
+chau
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/4ea96a15/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hi.txt
----------------------------------------------------------------------
diff --git a/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hi.txt
b/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hi.txt
new file mode 100644
index 0000000..32f95c0
--- /dev/null
+++ b/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hi.txt
@@ -0,0 +1 @@
+hi
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/4ea96a15/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hola.txt
----------------------------------------------------------------------
diff --git a/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hola.txt
b/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hola.txt
new file mode 100644
index 0000000..b8b4a4e
--- /dev/null
+++ b/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hola.txt
@@ -0,0 +1 @@
+hola
\ No newline at end of file


Mime
View raw message