camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject svn commit: r749936 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/converter/stream/ main/java/org/apache/camel/util/ test/java/org/apache/camel/converter/stream/
Date Wed, 04 Mar 2009 07:34:18 GMT
Author: ningjiang
Date: Wed Mar  4 07:34:17 2009
New Revision: 749936

URL: http://svn.apache.org/viewvc?rev=749936&view=rev
Log:
CAMEL-1413 Cached the big message into the file instead of using memory directly

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
  (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
  (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
  (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
  (with props)
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/FileUtil.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/IOHelper.java

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java?rev=749936&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
(added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
Wed Mar  4 07:34:17 2009
@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.converter.stream;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.StreamCache;
+import org.apache.camel.converter.IOConverter;
+import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IOHelper;
+
+public class CachedOutputStream extends OutputStream {
+    private static final File DEFAULT_TEMP_DIR;
+    private static final int DEFAULT_THRESHOLD;
+    static {
+        String s = System.getProperty("org.apache.camel.util.CachedOutputStream.Threshold",
+                                      "-1");
+        int i = Integer.parseInt(s);
+        if (i <= 0) {
+            i = 64 * 1024;
+        }
+        DEFAULT_THRESHOLD = i;
+        
+        s = System.getProperty("org.apache.camel.util.CachedOutputStream.OutputDirectory");
+        if (s != null) {
+            File f = new File(s);
+            if (f.exists() && f.isDirectory()) {
+                DEFAULT_TEMP_DIR = f;
+            } else {
+                DEFAULT_TEMP_DIR = null;
+            }
+        } else {
+            DEFAULT_TEMP_DIR = null;
+        }
+    }
+
+    protected boolean outputLocked;
+    protected OutputStream currentStream;
+
+    private long threshold = DEFAULT_THRESHOLD;
+
+    private int totalLength;
+
+    private boolean inmem;
+
+    private File tempFile;
+
+    private File outputDir = DEFAULT_TEMP_DIR;   
+    
+    private List<Object> streamList = new ArrayList<Object>();
+
+    
+    public CachedOutputStream() {
+        currentStream = new ByteArrayOutputStream(2048);
+        inmem = true;
+    }
+
+    public CachedOutputStream(long threshold) {
+        this.threshold = threshold; 
+        currentStream = new ByteArrayOutputStream(2048);
+        inmem = true;
+    }
+
+    /**
+     * Perform any actions required on stream flush (freeze headers, reset
+     * output stream ... etc.)
+     */
+    protected void doFlush() throws IOException {
+        
+    }
+
+    public void flush() throws IOException {
+        currentStream.flush();       
+        doFlush();
+    }
+
+    /**
+     * Perform any actions required on stream closure (handle response etc.)
+     */
+    protected void doClose() throws IOException {
+        
+    }
+    
+    /**
+     * Perform any actions required after stream closure (close the other related stream
etc.)
+     */
+    protected void postClose() throws IOException {
+        
+    }
+
+    /**
+     * Locks the output stream to prevent additional writes, but maintains
+     * a pointer to it so an InputStream can be obtained
+     * @throws IOException
+     */
+    public void lockOutputStream() throws IOException {
+        currentStream.flush();
+        outputLocked = true;
+        streamList.remove(currentStream);
+    }
+    
+    public void close() throws IOException {
+        currentStream.flush();        
+        doClose();
+        currentStream.close();
+        maybeDeleteTempFile(currentStream);
+        postClose();
+    }
+
+    public boolean equals(Object obj) {
+        return currentStream.equals(obj);
+    }
+
+    /**
+     * Replace the original stream with the new one, optionally copying the content of the
old one
+     * into the new one.
+     * When with Attachment, needs to replace the xml writer stream with the stream used
by
+     * AttachmentSerializer or copy the cached output stream to the "real"
+     * output stream, i.e. onto the wire.
+     * 
+     * @param out the new output stream
+     * @param copyOldContent flag indicating if the old content should be copied
+     * @throws IOException
+     */
+    public void resetOut(OutputStream out, boolean copyOldContent) throws IOException {
+        if (out == null) {
+            out = new ByteArrayOutputStream();
+        }
+
+        if (currentStream instanceof CachedOutputStream) {
+            CachedOutputStream ac = (CachedOutputStream) currentStream;
+            InputStream in = ac.getInputStream();
+            IOHelper.copyAndCloseInput(in, out);
+        } else {
+            if (inmem) {
+                if (currentStream instanceof ByteArrayOutputStream) {
+                    ByteArrayOutputStream byteOut = (ByteArrayOutputStream) currentStream;
+                    if (copyOldContent && byteOut.size() > 0) {
+                        byteOut.writeTo(out);
+                    }
+                } else {
+                    throw new IOException("Unknown format of currentStream");
+                }
+            } else {
+                // read the file
+                currentStream.close();
+                FileInputStream fin = new FileInputStream(tempFile);
+                if (copyOldContent) {
+                    IOHelper.copyAndCloseInput(fin, out);
+                }
+                streamList.remove(currentStream);
+                tempFile.delete();
+                tempFile = null;
+                inmem = true;
+            }
+        }
+        currentStream = out;
+        outputLocked = false;
+    }
+
+    public static void copyStream(InputStream in, OutputStream out, int bufferSize) throws
IOException {
+        IOHelper.copyAndCloseInput(in, out, bufferSize);
+    }
+
+    public int size() {
+        return totalLength;
+    }
+    
+    public byte[] getBytes() throws IOException {
+        flush();
+        if (inmem) {
+            if (currentStream instanceof ByteArrayOutputStream) {
+                return ((ByteArrayOutputStream)currentStream).toByteArray();
+            } else {
+                throw new IOException("Unknown format of currentStream");
+            }
+        } else {
+            // read the file
+            FileInputStream fin = new FileInputStream(tempFile);
+            return IOConverter.toBytes(fin);
+        }
+    }
+    
+    public void writeCacheTo(OutputStream out) throws IOException {
+        flush();
+        if (inmem) {
+            if (currentStream instanceof ByteArrayOutputStream) {
+                ((ByteArrayOutputStream)currentStream).writeTo(out);
+            } else {
+                throw new IOException("Unknown format of currentStream");
+            }
+        } else {
+            // read the file
+            FileInputStream fin = new FileInputStream(tempFile);
+            IOHelper.copyAndCloseInput(fin, out);
+        }
+    }
+    
+    
+    public void writeCacheTo(StringBuilder out, int limit) throws IOException {
+        flush();
+        if (totalLength < limit
+            || limit == -1) {
+            writeCacheTo(out);
+            return;
+        }
+        
+        int count = 0;
+        if (inmem) {
+            if (currentStream instanceof ByteArrayOutputStream) {
+                byte bytes[] = ((ByteArrayOutputStream)currentStream).toByteArray();
+                out.append(IOHelper.newStringFromBytes(bytes, 0, limit));
+            } else {
+                throw new IOException("Unknown format of currentStream");
+            }
+        } else {
+            // read the file
+            FileInputStream fin = new FileInputStream(tempFile);
+            byte bytes[] = new byte[1024];
+            int x = fin.read(bytes);
+            while (x != -1) {
+                if ((count + x) > limit) {
+                    x = limit - count;
+                }
+                out.append(IOHelper.newStringFromBytes(bytes, 0, x));
+                count += x;
+                
+                if (count >= limit) {
+                    x = -1;
+                } else {
+                    x = fin.read(bytes);
+                }
+            }
+            fin.close();
+        }
+    }
+    public void writeCacheTo(StringBuilder out) throws IOException {
+        flush();
+        if (inmem) {
+            if (currentStream instanceof ByteArrayOutputStream) {
+                byte[] bytes = ((ByteArrayOutputStream)currentStream).toByteArray();
+                out.append(IOHelper.newStringFromBytes(bytes));
+            } else {
+                throw new IOException("Unknown format of currentStream");
+            }
+        } else {
+            // read the file
+            FileInputStream fin = new FileInputStream(tempFile);
+            byte bytes[] = new byte[1024];
+            int x = fin.read(bytes);
+            while (x != -1) {
+                out.append(IOHelper.newStringFromBytes(bytes, 0, x));
+                x = fin.read(bytes);
+            }
+            fin.close();
+        }
+    }    
+    
+
+    /**
+     * @return the underlying output stream
+     */
+    public OutputStream getOut() {
+        return currentStream;
+    }
+
+    public int hashCode() {
+        return currentStream.hashCode();
+    }
+
+    public String toString() {
+        StringBuilder builder = new StringBuilder().append("[")
+            .append(CachedOutputStream.class.getName())
+            .append(" Content: ");
+        try {
+            writeCacheTo(builder);
+        } catch (IOException e) {
+            //ignore
+        }
+        return builder.append("]").toString();
+    }
+
+    protected void onWrite() throws IOException {
+        
+    }
+
+    public void write(byte[] b, int off, int len) throws IOException {
+        if (!outputLocked) {
+            onWrite();
+            this.totalLength += len;
+            if (inmem && totalLength > threshold && currentStream instanceof
ByteArrayOutputStream) {
+                createFileOutputStream();
+            }
+            currentStream.write(b, off, len);
+        }
+    }
+
+    public void write(byte[] b) throws IOException {
+        if (!outputLocked) {
+            onWrite();
+            this.totalLength += b.length;
+            if (inmem && totalLength > threshold && currentStream instanceof
ByteArrayOutputStream) {
+                createFileOutputStream();
+            }
+            currentStream.write(b);
+        }
+    }
+
+    public void write(int b) throws IOException {
+        if (!outputLocked) {
+            onWrite();
+            this.totalLength++;
+            if (inmem && totalLength > threshold && currentStream instanceof
ByteArrayOutputStream) {
+                createFileOutputStream();
+            }
+            currentStream.write(b);
+        }
+    }
+
+    private void createFileOutputStream() throws IOException {
+        ByteArrayOutputStream bout = (ByteArrayOutputStream)currentStream;
+        if (outputDir == null) {
+            tempFile = FileUtil.createTempFile("cos", "tmp");
+        } else {
+            tempFile = FileUtil.createTempFile("cos", "tmp", outputDir, false);
+        }
+        
+        currentStream = new BufferedOutputStream(new FileOutputStream(tempFile));
+        bout.writeTo(currentStream);
+        inmem = false;
+        streamList.add(currentStream);
+    }
+
+    public File getTempFile() {
+        return tempFile != null && tempFile.exists() ? tempFile : null;
+    }
+
+    public InputStream getInputStream() throws IOException {
+        flush();
+        if (inmem) {
+            if (currentStream instanceof ByteArrayOutputStream) {
+                return new ByteArrayInputStream(((ByteArrayOutputStream) currentStream).toByteArray());
+            } else {
+                return null;
+            }
+        } else {
+            try {
+                FileInputStream fileInputStream = new FileInputStream(tempFile) {
+                    public void close() throws IOException {
+                        super.close();
+                        maybeDeleteTempFile(this);
+                    }
+                };
+                streamList.add(fileInputStream);
+                return fileInputStream;
+            } catch (FileNotFoundException e) {
+                throw new IOException("Cached file was deleted, " + e.toString());
+            }
+        }
+    }
+    
+    public StreamCache getStreamCache() throws IOException {
+        flush();
+        if (inmem) {
+            if (currentStream instanceof ByteArrayOutputStream) {
+                return new InputStreamCache(((ByteArrayOutputStream) currentStream).toByteArray());
+            } else {
+                return null;
+            }
+        } else {
+            try {
+                FileInputStreamCache fileInputStream = new FileInputStreamCache(tempFile,
this);
+                return fileInputStream;
+            } catch (FileNotFoundException e) {
+                throw new IOException("Cached file was deleted, " + e.toString());
+            }
+        }
+    }
+    
+    private void maybeDeleteTempFile(Object stream) {        
+        streamList.remove(stream);        
+        if (!inmem && tempFile != null && streamList.isEmpty()) {       
    
+            tempFile.delete();
+            tempFile = null;
+            currentStream = new ByteArrayOutputStream(1024);
+            inmem = true;
+        }
+    }
+
+    public void setOutputDir(File outputDir) throws IOException {
+        this.outputDir = outputDir;
+    }
+    public void setThreshold(long threshold) {
+        this.threshold = threshold;
+    }
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/CachedOutputStream.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java?rev=749936&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
(added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
Wed Mar  4 07:34:17 2009
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.converter.stream;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.StreamCache;
+
+public class FileInputStreamCache extends InputStream implements StreamCache {
+    private FileInputStream inputStream;
+    private CachedOutputStream cachedOutputStream;
+    private File file;
+
+    public FileInputStreamCache(File file, CachedOutputStream cos) throws FileNotFoundException
{
+        this.file = file;
+        cachedOutputStream = cos;
+        inputStream = new FileInputStream(file);       
+    }
+    
+    public void close() {
+        try {
+            inputStream.close();
+            cachedOutputStream.close();
+        } catch (Exception exception) {
+            throw new RuntimeCamelException(exception);
+        } 
+    }
+
+    public void reset() {            
+        try {
+            inputStream.close();            
+            inputStream = new FileInputStream(file);
+        } catch (Exception exception) {
+            throw new RuntimeCamelException(exception);
+        }            
+    }
+    
+    public int available() throws IOException {
+        return inputStream.available();
+    }
+
+    @Override
+    public int read() throws IOException {
+        return inputStream.read();
+    }   
+    
+}   

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/FileInputStreamCache.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java?rev=749936&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
(added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
Wed Mar  4 07:34:17 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.converter.stream;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.camel.StreamCache;
+
+
+public class InputStreamCache extends ByteArrayInputStream implements StreamCache {
+
+    public InputStreamCache(byte[] data) {
+        super(data);
+    }
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/InputStreamCache.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java?rev=749936&r1=749935&r2=749936&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/converter/stream/StreamCacheConverter.java
Wed Mar  4 07:34:17 2009
@@ -17,6 +17,9 @@
 package org.apache.camel.converter.stream;
 
 import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
@@ -28,9 +31,11 @@
 
 import org.apache.camel.Converter;
 import org.apache.camel.Exchange;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.StreamCache;
 import org.apache.camel.converter.jaxp.BytesSource;
 import org.apache.camel.converter.jaxp.StringSource;
+import org.apache.camel.util.IOHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -67,8 +72,9 @@
 
     @Converter
     public StreamCache convertToStreamCache(InputStream stream, Exchange exchange) throws
IOException {
-        byte[] bytes = exchange.getContext().getTypeConverter().convertTo(byte[].class, stream);
-        return new InputStreamCache(bytes);
+        CachedOutputStream cos = new CachedOutputStream();
+        IOHelper.copyAndCloseInput(stream, cos);       
+        return cos.getStreamCache();
     }
 
     @Converter
@@ -101,14 +107,15 @@
      * {@link StreamCache} implementation for Cache the StreamSource {@link StreamSource}s
      */
     private class StreamSourceCache extends StreamSource implements StreamCache {
-        InputStreamCache inputStreamCache;
+        StreamCache streamCache;
         ReaderCache readCache;
         
         public StreamSourceCache(StreamSource source, Exchange exchange) throws IOException
{
             if (source.getInputStream() != null) {
-                byte[] bytes = exchange.getContext().getTypeConverter().convertTo(byte[].class,
source.getInputStream());
-                inputStreamCache = new InputStreamCache(bytes);
-                setInputStream(inputStreamCache);
+                CachedOutputStream cos = new CachedOutputStream();
+                IOHelper.copyAndCloseInput(source.getInputStream(), cos);
+                streamCache = cos.getStreamCache();
+                setInputStream((InputStream)streamCache);
                 setSystemId(source.getSystemId());
             }
             if (source.getReader() != null) {
@@ -119,8 +126,8 @@
         }
 
         public void reset() {
-            if (inputStreamCache != null) {
-                inputStreamCache.reset();
+            if (streamCache != null) {
+                streamCache.reset();
             }
             if (readCache != null) {
                 readCache.reset();
@@ -128,15 +135,7 @@
         }
         
     }
-
-    private class InputStreamCache extends ByteArrayInputStream implements StreamCache {
-
-        public InputStreamCache(byte[] data) {
-            super(data);
-        }
-
-    }
-
+      
     private class ReaderCache extends StringReader implements StreamCache {
 
         public ReaderCache(String s) {
@@ -156,6 +155,8 @@
         }
 
     }
+    
+    
 
 
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/FileUtil.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/FileUtil.java?rev=749936&r1=749935&r2=749936&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/FileUtil.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/FileUtil.java Wed Mar  4 07:34:17
2009
@@ -16,10 +16,26 @@
  */
 package org.apache.camel.util;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 /**
  * File utilities
  */
 public final class FileUtil {
+    
+    private static final int RETRY_SLEEP_MILLIS = 10;
+    private static File defaultTempDir;
 
     private FileUtil() {
     }
@@ -34,5 +50,262 @@
         }
         return path;
     }
+    
+    private static synchronized File getDefaultTempDir() {
+        if (defaultTempDir != null
+            && defaultTempDir.exists()) {
+            return defaultTempDir;
+        }
+        
+        String s = null;
+        try {
+            s = System.getProperty(FileUtil.class.getName() + ".TempDirectory");
+        } catch (SecurityException e) {
+            //Ignorable, we'll use the default
+        }
+        if (s == null) {
+            int x = (int)(Math.random() * 1000000);
+            s = System.getProperty("java.io.tmpdir");
+            File checkExists = new File(s);
+            if (!checkExists.exists()) {
+                throw new RuntimeException("The directory " 
+                                       + checkExists.getAbsolutePath() 
+                                       + " does not exist, please set java.io.tempdir"
+                                       + " to an existing directory");
+            }
+            File f = new File(s, "camel-tmp-" + x);
+            while (!f.mkdir()) {
+                x = (int)(Math.random() * 1000000);
+                f = new File(s, "camel-tmp-" + x);
+            }
+            defaultTempDir = f;
+            Thread hook = new Thread() {
+                @Override
+                public void run() {
+                    removeDir(defaultTempDir);
+                }
+            };
+            Runtime.getRuntime().addShutdownHook(hook);            
+        } else {
+            //assume someone outside of us will manage the directory
+            File f = new File(s);
+            f.mkdirs();
+            defaultTempDir = f;
+        }
+        return defaultTempDir;
+    }
+
+    public static void mkDir(File dir) {
+        if (dir == null) {
+            throw new RuntimeException("dir attribute is required");
+        }
+
+        if (dir.isFile()) {
+            throw new RuntimeException("Unable to create directory as a file "
+                                    + "already exists with that name: " + dir.getAbsolutePath());
+        }
+
+        if (!dir.exists()) {
+            boolean result = doMkDirs(dir);
+            if (!result) {
+                String msg = "Directory " + dir.getAbsolutePath()
+                             + " creation was not successful for an unknown reason";
+                throw new RuntimeException(msg);
+            }
+        }
+    }
+
+    /**
+     * Attempt to fix possible race condition when creating directories on
+     * WinXP, also Windows2000. If the mkdirs does not work, wait a little and
+     * try again.
+     */
+    private static boolean doMkDirs(File f) {
+        if (!f.mkdirs()) {
+            try {
+                Thread.sleep(RETRY_SLEEP_MILLIS);
+                return f.mkdirs();
+            } catch (InterruptedException ex) {
+                return f.mkdirs();
+            }
+        }
+        return true;
+    }
+
+    public static void removeDir(File d) {
+        String[] list = d.list();
+        if (list == null) {
+            list = new String[0];
+        }
+        for (int i = 0; i < list.length; i++) {
+            String s = list[i];
+            File f = new File(d, s);
+            if (f.isDirectory()) {
+                removeDir(f);
+            } else {
+                delete(f);
+            }
+        }
+        delete(d);
+    }
+
+    public static void delete(File f) {
+        if (!f.delete()) {
+            if (isWindows()) {
+                System.gc();
+            }
+            try {
+                Thread.sleep(RETRY_SLEEP_MILLIS);
+            } catch (InterruptedException ex) {
+                // Ignore Exception
+            }
+            if (!f.delete()) {
+                f.deleteOnExit();
+            }
+        }
+    }
+
+    private static boolean isWindows() {
+        String osName = System.getProperty("os.name").toLowerCase(Locale.US);
+        return osName.indexOf("windows") > -1;
+    }
+
+    public static File createTempFile(String prefix, String suffix) throws IOException {
+        return createTempFile(prefix, suffix, null, false);
+    }
+    
+    public static File createTempFile(String prefix, String suffix, File parentDir,
+                               boolean deleteOnExit) throws IOException {
+        File result = null;
+        File parent = (parentDir == null)
+            ? getDefaultTempDir()
+            : parentDir;
+            
+        if (suffix == null) {
+            suffix = ".tmp";
+        }
+        if (prefix == null) {
+            prefix = "camel";
+        } else if (prefix.length() < 3) {
+            prefix = prefix + "camel";
+        }
+        result = File.createTempFile(prefix, suffix, parent);
+
+        //if parentDir is null, we're in our default dir
+        //which will get completely wiped on exit from our exit
+        //hook.  No need to set deleteOnExit() which leaks memory.
+        if (deleteOnExit && parentDir != null) {
+            result.deleteOnExit();
+        }
+        return result;
+    }
+    
+    public static String getStringFromFile(File location) {
+        InputStream is = null;
+        String result = null;
+
+        try {
+            is = new FileInputStream(location);
+            result = normalizeCRLF(is);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            if (is != null) {
+                try {
+                    is.close();
+                } catch (Exception e) {
+                    //do nothing
+                }
+            }
+        }
+
+        return result;
+    }
+
+    public static String normalizeCRLF(InputStream instream) {
+        BufferedReader in = new BufferedReader(new InputStreamReader(instream));
+        StringBuffer result = new StringBuffer();
+        String line = null;
+
+        try {
+            line = in.readLine();
+            while (line != null) {
+                String[] tok = line.split("\\s");
+
+                for (int x = 0; x < tok.length; x++) {
+                    String token = tok[x];
+                    result.append("  " + token);
+                }
+                line = in.readLine();
+            }
+        } catch (Exception ex) {
+            ex.printStackTrace();
+        }
+
+        String rtn = result.toString();
+
+        rtn = ignoreTokens(rtn, "<!--", "-->");
+        rtn = ignoreTokens(rtn, "/*", "*/");
+        return rtn;
+    }
+    
+    private static String ignoreTokens(final String contents, 
+                                       final String startToken, final String endToken) {
+        String rtn = contents;
+        int headerIndexStart = rtn.indexOf(startToken);
+        int headerIndexEnd = rtn.indexOf(endToken);
+        if (headerIndexStart != -1 && headerIndexEnd != -1 && headerIndexStart
< headerIndexEnd) {
+            rtn = rtn.substring(0, headerIndexStart - 1)
+                + rtn.substring(headerIndexEnd + endToken.length() + 1);
+        }
+        return rtn;
+    }
+
+    public static List<File> getFiles(File dir, final String pattern) {
+        return getFiles(dir, pattern, null);
+    }
+    public static List<File> getFilesRecurse(File dir, final String pattern) {
+        return getFilesRecurse(dir, pattern, null);
+    }
+
+    public static List<File> getFiles(File dir, final String pattern, File exclude)
{
+        return getFilesRecurse(dir, Pattern.compile(pattern), exclude, false, new ArrayList<File>());
+    }
+    public static List<File> getFilesRecurse(File dir, final String pattern, File exclude)
{
+        return getFilesRecurse(dir, Pattern.compile(pattern), exclude, true, new ArrayList<File>());
   
+    }
+    private static List<File> getFilesRecurse(File dir, 
+                                              Pattern pattern,
+                                              File exclude, boolean rec,
+                                              List<File> fileList) {
+        for (File file : dir.listFiles()) {
+            if (file.equals(exclude)) {
+                continue;
+            }
+            if (file.isDirectory() && rec) {
+                getFilesRecurse(file, pattern, exclude, rec, fileList);
+            } else {
+                Matcher m = pattern.matcher(file.getName());
+                if (m.matches()) {
+                    fileList.add(file);                                
+                }
+            }
+        }
+        return fileList;
+    }
+
+    public static List<String> readLines(File file) throws Exception {
+        if (!file.exists()) {
+            return new ArrayList<String>();
+        }
+        BufferedReader reader = new BufferedReader(new FileReader(file));
+        List<String> results = new ArrayList<String>();
+        String line = reader.readLine();
+        while (line != null) {
+            results.add(line);
+            line = reader.readLine();
+        }
+        return results;
+    }
 
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/IOHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/IOHelper.java?rev=749936&r1=749935&r2=749936&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/IOHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/IOHelper.java Wed Mar  4 07:34:17
2009
@@ -19,6 +19,8 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.Charset;
 
 /**
  * IO helper class.
@@ -26,10 +28,46 @@
  * @version $Revision$
  */
 public final class IOHelper {
+    
+    private static final int DEFAULT_BUFFER_SIZE = 1024 * 4;
+    private static final Charset UTF8_CHARSET = Charset.forName("utf-8");
 
     private IOHelper() {
         //Utility Class
     }
+    
+    /**
+     * Use this function instead of new String(byte[]) to avoid surprises from non-standard
default encodings.
+     * @param bytes
+     * @return
+     */
+    public static String newStringFromBytes(byte[] bytes) {
+        try {
+            return new String(bytes, UTF8_CHARSET.name());
+        } catch (UnsupportedEncodingException e) {
+            throw 
+                new RuntimeException("Impossible failure: Charset.forName(\"utf-8\") returns
invalid name.");
+
+        }
+    }
+
+    /**
+     * Use this function instead of new String(byte[], int, int) 
+     * to avoid surprises from non-standard default encodings.
+     * @param bytes
+     * @param start
+     * @param length
+     * @return
+     */
+    public static String newStringFromBytes(byte[] bytes, int start, int length) {
+        try {
+            return new String(bytes, start, length, UTF8_CHARSET.name());
+        } catch (UnsupportedEncodingException e) {
+            throw 
+                new RuntimeException("Impossible failure: Charset.forName(\"utf-8\") returns
invalid name.");
+
+        }
+    }
 
     /**
      * A factory method which creates an {@link IOException} from the given
@@ -49,14 +87,38 @@
         return answer;
     }
 
-    public static void copy(InputStream stream, OutputStream os) throws IOException {
-        byte[] data = new byte[4096];
-        int read = stream.read(data);
-        while (read != -1) {
-            os.write(data, 0, read);
-            read = stream.read(data);
+    public static int copy(InputStream input, OutputStream output) throws IOException {
+        return copy(input, output, DEFAULT_BUFFER_SIZE);
+    }
+    
+    public static int copy(final InputStream input, final OutputStream output, int bufferSize)
+        throws IOException {
+        int avail = input.available();
+        if (avail > 262144) {
+            avail = 262144;
+        }
+        if (avail > bufferSize) {
+            bufferSize = avail;
         }
-        os.flush();
+        final byte[] buffer = new byte[bufferSize];
+        int n = 0;
+        n = input.read(buffer);
+        int total = 0;
+        while (-1 != n) {
+            output.write(buffer, 0, n);
+            total += n;
+            n = input.read(buffer);
+        }
+        output.flush();
+        return total;
+    }
+    
+    public static void copyAndCloseInput(InputStream input, OutputStream output) throws IOException
{
+        copy(input, output);
+        input.close();
+    }
+    
+    public static void copyAndCloseInput(InputStream input, OutputStream output, int bufferSize)
throws IOException {
+        copy(input, output, bufferSize);
     }
-
 }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java?rev=749936&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
(added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
Wed Mar  4 07:34:17 2009
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.converter.stream;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import junit.framework.TestCase;
+import org.apache.camel.StreamCache;
+import org.apache.camel.converter.IOConverter;
+import org.apache.camel.util.CollectionStringBuffer;
+
+public class CachedOutputStreamTest extends TestCase {
+    private static final String TEST_STRING = "This is a test string and it has enough" 
+        + " aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa ";
+    
+    private File file = new File("./target/cacheFile");
+
+    private static void deleteDirectory(File file) {
+        if (file.isDirectory()) {
+            File[] files = file.listFiles();
+            for (int i = 0; i < files.length; i++) {
+                deleteDirectory(files[i]);
+            }
+        }
+        file.delete();
+    }
+    
+    private static String toString(InputStream input) throws IOException {        
+        BufferedReader reader = new BufferedReader(new InputStreamReader(input));
+        CollectionStringBuffer builder = new CollectionStringBuffer("\n");
+        while (true) {
+            String line = reader.readLine();
+            if (line == null) {
+                return builder.toString();
+            }
+            builder.append(line);
+        }
+    }
+    
+    protected void setUp() throws Exception {        
+        if (file.exists()) {
+            deleteDirectory(file);
+        }
+        file.mkdirs();
+    }
+       
+    public void testCacheStreamToFileAndCloseStream() throws IOException {       
+        
+        CachedOutputStream cos = new CachedOutputStream(16);
+        cos.setOutputDir(file);
+        cos.write(TEST_STRING.getBytes("UTF-8"));        
+        String[] files = file.list();
+        assertEquals("we should have a temp file", files.length, 1);
+        assertTrue("The file name should start with cos" , files[0].startsWith("cos"));
+        
+        StreamCache cache = cos.getStreamCache();
+        assertTrue("Should get the FileInputStreamCache", cache instanceof FileInputStreamCache);
+        String temp = toString((InputStream)cache);
+        ((InputStream)cache).close();
+        assertEquals("Cached a wrong file", temp, TEST_STRING);
+        try {
+            cache.reset();
+            // The stream is closed, so the temp file is gone.
+            fail("we expect the exception here");
+        } catch (Exception exception) {
+            // do nothing
+        }
+        files = file.list();
+        assertEquals("we should have no temp file", files.length, 0);
+       
+    }
+    
+    public void testCacheStreamToFileAndNotCloseStream() throws IOException {       
+        
+        CachedOutputStream cos = new CachedOutputStream(16);
+        cos.setOutputDir(file);
+        cos.write(TEST_STRING.getBytes("UTF-8"));        
+        String[] files = file.list();
+        assertEquals("we should have a temp file", files.length, 1);
+        assertTrue("The file name should start with cos" , files[0].startsWith("cos"));
+        
+        StreamCache cache = cos.getStreamCache();
+        assertTrue("Should get the FileInputStreamCache", cache instanceof FileInputStreamCache);
+        String temp = toString((InputStream)cache);
+        assertEquals("Cached a wrong file", temp, TEST_STRING);
+        cache.reset();
+        temp = toString((InputStream)cache);
+        assertEquals("Cached a wrong file", temp, TEST_STRING);
+        
+        ((InputStream)cache).close();
+        files = file.list();
+        assertEquals("we should have no temp file", files.length, 0);       
+    }
+    
+    public void testCacheStreamToMemory() throws IOException {
+        CachedOutputStream cos = new CachedOutputStream();
+        cos.setOutputDir(file);
+        cos.write(TEST_STRING.getBytes("UTF-8"));        
+        String[] files = file.list();
+        assertEquals("we should have no temp file", files.length, 0);
+        StreamCache cache = cos.getStreamCache();
+        assertTrue("Should get the InputStreamCache", cache instanceof InputStreamCache);
+        String temp = IOConverter.toString((InputStream)cache);
+        assertEquals("Cached a wrong file", temp, TEST_STRING);
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/converter/stream/CachedOutputStreamTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message