camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1441387 - in /camel/trunk/components/camel-hdfs/src: main/java/org/apache/camel/component/hdfs/ test/java/org/apache/camel/component/hdfs/ test/java/org/apache/camel/component/hdfs/integration/
Date Fri, 01 Feb 2013 11:14:26 GMT
Author: davsclaus
Date: Fri Feb  1 11:14:26 2013
New Revision: 1441387

URL: http://svn.apache.org/viewvc?rev=1441387&view=rev
Log:
CAMEL-5971: hdfs producer will close stream after writing if not using split strategy. As
otherwise the file would not be visible for other parties. And the file would only be closed
when stopping Camel which is very wrong.

Added:
    camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/FromFileToHdfsTest.java
  (with props)
Removed:
    camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerFileWriteTest.java
Modified:
    camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
    camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
    camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
    camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
    camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsAppendTest.java
    camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationTest.java

Modified: camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java?rev=1441387&r1=1441386&r2=1441387&view=diff
==============================================================================
--- camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
(original)
+++ camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
Fri Feb  1 11:14:26 2013
@@ -33,6 +33,7 @@ public class HdfsConfiguration {
     private String path;
     private boolean overwrite = true;
     private boolean append;
+    private boolean wantAppend;
     private int bufferSize = HdfsConstants.DEFAULT_BUFFERSIZE;
     private short replication = HdfsConstants.DEFAULT_REPLICATION;
     private long blockSize = HdfsConstants.DEFAULT_BLOCKSIZE;
@@ -192,6 +193,7 @@ public class HdfsConfiguration {
 
         overwrite = getBoolean(hdfsSettings, "overwrite", overwrite);
         append = getBoolean(hdfsSettings, "append", append);
+        wantAppend = append;
         bufferSize = getInteger(hdfsSettings, "bufferSize", bufferSize);
         replication = getShort(hdfsSettings, "replication", replication);
         blockSize = getLong(hdfsSettings, "blockSize", blockSize);
@@ -254,6 +256,10 @@ public class HdfsConfiguration {
         return append;
     }
 
+    public boolean isWantAppend() {
+        return wantAppend;
+    }
+
     public void setAppend(boolean append) {
         this.append = append;
     }

Modified: camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java?rev=1441387&r1=1441386&r2=1441387&view=diff
==============================================================================
--- camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
(original)
+++ camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
Fri Feb  1 11:14:26 2013
@@ -48,10 +48,11 @@ public class HdfsOutputStream implements
         HdfsInfo info = new HdfsInfo(ret.actualPath);
 
         ret.suffixedPath = ret.actualPath + '.' + configuration.getOpenedSuffix();
-        if (configuration.isAppend()) {
+        if (configuration.isWantAppend() || configuration.isAppend()) {
             if (!info.getFileSystem().exists(new Path(ret.actualPath))) {
                 configuration.setAppend(false);
             } else {
+                configuration.setAppend(true);
                 info = new HdfsInfo(ret.suffixedPath);
                 info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath));
             }

Modified: camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java?rev=1441387&r1=1441386&r2=1441387&view=diff
==============================================================================
--- camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
(original)
+++ camel/trunk/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
Fri Feb  1 11:14:26 2013
@@ -181,8 +181,23 @@ public class HdfsProducer extends Defaul
             ostream = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
         }
 
+        String path = ostream.getActualPath();
+        log.trace("Writing body to hdfs-file {}", path);
         ostream.append(key, body, exchange.getContext().getTypeConverter());
+
         idle.set(false);
+
+        // if no idle checker then we need to explicit close the stream after usage
+        if (scheduler == null) {
+            try {
+                ostream.close();
+                ostream = null;
+            } catch (IOException e) {
+                // ignore
+            }
+        }
+
+        log.debug("Wrote body to hdfs-file {}", path);
     }
 
     private StringBuilder newFileName() {
@@ -215,6 +230,7 @@ public class HdfsProducer extends Defaul
             if (System.currentTimeMillis() - ostream.getLastAccess() > strategy.value
&& !idle.get() && !ostream.isBusy().get()) {
                 idle.set(true);
                 try {
+                    HdfsProducer.this.log.trace("Closing stream as idle");
                     ostream.close();
                 } catch (IOException e) {
                     // ignore

Added: camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/FromFileToHdfsTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/FromFileToHdfsTest.java?rev=1441387&view=auto
==============================================================================
--- camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/FromFileToHdfsTest.java
(added)
+++ camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/FromFileToHdfsTest.java
Fri Feb  1 11:14:26 2013
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs;
+
+import java.io.File;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class FromFileToHdfsTest extends CamelTestSupport {
+
+    private static final Path TEMP_DIR = new Path(new File("target/outbox/").getAbsolutePath());
+
+    //Hadoop doesn't run on IBM JDK
+    private static final boolean SKIP = System.getProperty("java.vendor").contains("IBM");
+
+    @Before
+    public void setUp() throws Exception {
+        if (SKIP) {
+            return;
+        }
+        deleteDirectory("target/inbox");
+        deleteDirectory("target/outbox");
+        super.setUp();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        if (SKIP) {
+            return;
+        }
+
+        super.tearDown();
+        Configuration conf = new Configuration();
+        Path dir = new Path("target/outbox");
+        FileSystem fs = FileSystem.get(dir.toUri(), conf);
+        fs.delete(dir, true);
+    }
+
+    @Test
+    public void testFileToHdfs() throws Exception {
+        if (SKIP) {
+            return;
+        }
+
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create();
+
+        template.sendBodyAndHeader("file:target/inbox", "Hello World", Exchange.FILE_NAME,
"hello.txt");
+
+        notify.matchesMockWaitTime();
+
+        File delete = new File("target/inbox/hello.txt");
+        assertTrue("File should be deleted " + delete, !delete.exists());
+
+        File create = new File(TEMP_DIR + "/output.txt");
+        assertTrue("File should be created " + create, create.exists());
+    }
+
+    @Test
+    public void testTwoFilesToHdfs() throws Exception {
+        if (SKIP) {
+            return;
+        }
+
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(2).create();
+
+        template.sendBodyAndHeader("file:target/inbox", "Hello World", Exchange.FILE_NAME,
"hello.txt");
+        template.sendBodyAndHeader("file:target/inbox", "Bye World", Exchange.FILE_NAME,
"bye.txt");
+
+        notify.matchesMockWaitTime();
+
+        File delete = new File("target/inbox/hello.txt");
+        assertTrue("File should be deleted " + delete, !delete.exists());
+        delete = new File("target/inbox/bye.txt");
+        assertTrue("File should be deleted " + delete, !delete.exists());
+
+        File create = new File(TEMP_DIR + "/output.txt");
+        assertTrue("File should be created " + create, create.exists());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("file:target/inbox?delete=true")
+                    .to("hdfs:///" + TEMP_DIR.toUri() + "/output.txt?fileSystemType=LOCAL");
+            }
+        };
+    }
+}

Propchange: camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/FromFileToHdfsTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/FromFileToHdfsTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java?rev=1441387&r1=1441386&r2=1441387&view=diff
==============================================================================
--- camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
(original)
+++ camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
Fri Feb  1 11:14:26 2013
@@ -58,22 +58,16 @@ public class HdfsProducerTest extends Ca
         if (SKIP) {
             return;
         }
-        for (int i = 0; i < 10; ++i) {
-            template.sendBody("direct:start1", "PAPPO" + i);
-        }
-        stopCamelContext();
+        template.sendBody("direct:start1", "PAPPO");
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel1");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
         SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
         Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
         Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-        int i = 0;
-        while (reader.next(key, value)) {
-            Text txt = (Text) value;
-            assertTrue(("PAPPO" + i).equals(txt.toString()));
-            ++i;
-        }
+        reader.next(key, value);
+        assertEquals("PAPPO", value.toString());
     }
 
     @Test
@@ -83,7 +77,7 @@ public class HdfsProducerTest extends Ca
         }
         Boolean aBoolean = true;
         template.sendBody("direct:write_boolean", aBoolean);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-boolean");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -102,7 +96,7 @@ public class HdfsProducerTest extends Ca
         }
         byte aByte = 8;
         template.sendBody("direct:write_byte", aByte);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-byte");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -121,7 +115,7 @@ public class HdfsProducerTest extends Ca
         }
         int anInt = 1234;
         template.sendBody("direct:write_int", anInt);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-int");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -140,7 +134,7 @@ public class HdfsProducerTest extends Ca
         }
         float aFloat = 12.34f;
         template.sendBody("direct:write_float", aFloat);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-float");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -159,7 +153,7 @@ public class HdfsProducerTest extends Ca
         }
         Double aDouble = 12.34D;
         template.sendBody("direct:write_double", aDouble);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-double");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -178,7 +172,7 @@ public class HdfsProducerTest extends Ca
         }
         long aLong = 1234567890;
         template.sendBody("direct:write_long", aLong);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-long");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -197,7 +191,7 @@ public class HdfsProducerTest extends Ca
         }
         String txt = "CIAO MONDO !";
         template.sendBody("direct:write_text1", txt);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text1");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -217,7 +211,7 @@ public class HdfsProducerTest extends Ca
         String txtKey = "THEKEY";
         String txtValue = "CIAO MONDO !";
         template.sendBodyAndHeader("direct:write_text2", txtValue, "KEY", txtKey);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text2");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -237,7 +231,7 @@ public class HdfsProducerTest extends Ca
         String txtKey = "THEKEY";
         String txtValue = "CIAO MONDO !";
         template.sendBodyAndHeader("direct:write_text3", txtValue, "KEY", txtKey);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text3");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -256,7 +250,7 @@ public class HdfsProducerTest extends Ca
         }
         String txtValue = "CIAO MONDO !";
         template.sendBody("direct:write_text4", txtValue);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text4");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -274,7 +268,7 @@ public class HdfsProducerTest extends Ca
         String txtKey = "THEKEY";
         String txtValue = "CIAO MONDO !";
         template.sendBodyAndHeader("direct:write_text5", txtValue, "KEY", txtKey);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text5");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -292,7 +286,8 @@ public class HdfsProducerTest extends Ca
             return;
         }
         super.tearDown();
-        Thread.sleep(100);
+
+        Thread.sleep(250);
         Configuration conf = new Configuration();
         Path dir = new Path("target/test");
         FileSystem fs = FileSystem.get(dir.toUri(), conf);

Modified: camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsAppendTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsAppendTest.java?rev=1441387&r1=1441386&r2=1441387&view=diff
==============================================================================
--- camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsAppendTest.java
(original)
+++ camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsAppendTest.java
Fri Feb  1 11:14:26 2013
@@ -63,7 +63,6 @@ public class HdfsAppendTest extends Came
         for (int i = 0; i < 10; ++i) {
             template.sendBody("direct:start1", "PIPPO");
         }
-        stopCamelContext();
 
         Configuration conf = new Configuration();
         Path file = new Path("hdfs://localhost:9000/tmp/test/test-camel-simple-write-file1");
@@ -82,7 +81,8 @@ public class HdfsAppendTest extends Came
     @Override
     public void tearDown() throws Exception {
         super.tearDown();
-        Thread.sleep(100);
+
+        Thread.sleep(250);
         Configuration conf = new Configuration();
         Path dir = new Path("hdfs://localhost:9000/tmp/test");
         FileSystem fs = FileSystem.get(dir.toUri(), conf);

Modified: camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationTest.java?rev=1441387&r1=1441386&r2=1441387&view=diff
==============================================================================
--- camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationTest.java
(original)
+++ camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationTest.java
Fri Feb  1 11:14:26 2013
@@ -25,7 +25,6 @@ import org.apache.camel.test.junit4.Came
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
 import org.junit.After;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -69,7 +68,8 @@ public class HdfsProducerConsumerIntegra
     @After
     public void tearDown() throws Exception {
         super.tearDown();
-        Thread.sleep(100);
+
+        Thread.sleep(250);
         Configuration conf = new Configuration();
         Path dir = new Path("hdfs://localhost:9000/tmp/test");
         FileSystem fs = FileSystem.get(dir.toUri(), conf);



Mime
View raw message