camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1441388 - in /camel/branches/camel-2.10.x: ./ components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/ components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/ components/camel-hdfs/src/test/java/org/apache/camel/co...
Date Fri, 01 Feb 2013 11:15:47 GMT
Author: davsclaus
Date: Fri Feb  1 11:15:47 2013
New Revision: 1441388

URL: http://svn.apache.org/viewvc?rev=1441388&view=rev
Log:
CAMEL-5971: hdfs producer will close stream after writing if not using split strategy. As
otherwise the file would not be visible for other parties. And the file would only be closed
when stopping Camel which is very wrong.

Added:
    camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/FromFileToHdfsTest.java
      - copied unchanged from r1441387, camel/trunk/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/FromFileToHdfsTest.java
Removed:
    camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerFileWriteTest.java
Modified:
    camel/branches/camel-2.10.x/   (props changed)
    camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
    camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
    camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
    camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
    camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsAppendTest.java
    camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationTest.java

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
  Merged /camel/trunk:r1441387

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java?rev=1441388&r1=1441387&r2=1441388&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
(original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
Fri Feb  1 11:15:47 2013
@@ -33,6 +33,7 @@ public class HdfsConfiguration {
     private String path;
     private boolean overwrite = true;
     private boolean append;
+    private boolean wantAppend;
     private int bufferSize = HdfsConstants.DEFAULT_BUFFERSIZE;
     private short replication = HdfsConstants.DEFAULT_REPLICATION;
     private long blockSize = HdfsConstants.DEFAULT_BLOCKSIZE;
@@ -192,6 +193,7 @@ public class HdfsConfiguration {
 
         overwrite = getBoolean(hdfsSettings, "overwrite", overwrite);
         append = getBoolean(hdfsSettings, "append", append);
+        wantAppend = append;
         bufferSize = getInteger(hdfsSettings, "bufferSize", bufferSize);
         replication = getShort(hdfsSettings, "replication", replication);
         blockSize = getLong(hdfsSettings, "blockSize", blockSize);
@@ -254,6 +256,10 @@ public class HdfsConfiguration {
         return append;
     }
 
+    public boolean isWantAppend() {
+        return wantAppend;
+    }
+
     public void setAppend(boolean append) {
         this.append = append;
     }

Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java?rev=1441388&r1=1441387&r2=1441388&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
(original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
Fri Feb  1 11:15:47 2013
@@ -48,10 +48,11 @@ public class HdfsOutputStream implements
         HdfsInfo info = new HdfsInfo(ret.actualPath);
 
         ret.suffixedPath = ret.actualPath + '.' + configuration.getOpenedSuffix();
-        if (configuration.isAppend()) {
+        if (configuration.isWantAppend() || configuration.isAppend()) {
             if (!info.getFileSystem().exists(new Path(ret.actualPath))) {
                 configuration.setAppend(false);
             } else {
+                configuration.setAppend(true);
                 info = new HdfsInfo(ret.suffixedPath);
                 info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath));
             }

Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java?rev=1441388&r1=1441387&r2=1441388&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
(original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
Fri Feb  1 11:15:47 2013
@@ -181,8 +181,23 @@ public class HdfsProducer extends Defaul
             ostream = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
         }
 
+        String path = ostream.getActualPath();
+        log.trace("Writing body to hdfs-file {}", path);
         ostream.append(key, body, exchange.getContext().getTypeConverter());
+
         idle.set(false);
+
+        // if no idle checker then we need to explicit close the stream after usage
+        if (scheduler == null) {
+            try {
+                ostream.close();
+                ostream = null;
+            } catch (IOException e) {
+                // ignore
+            }
+        }
+
+        log.debug("Wrote body to hdfs-file {}", path);
     }
 
     private StringBuilder newFileName() {
@@ -215,6 +230,7 @@ public class HdfsProducer extends Defaul
             if (System.currentTimeMillis() - ostream.getLastAccess() > strategy.value
&& !idle.get() && !ostream.isBusy().get()) {
                 idle.set(true);
                 try {
+                    HdfsProducer.this.log.trace("Closing stream as idle");
                     ostream.close();
                 } catch (IOException e) {
                     // ignore

Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java?rev=1441388&r1=1441387&r2=1441388&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
(original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
Fri Feb  1 11:15:47 2013
@@ -58,22 +58,16 @@ public class HdfsProducerTest extends Ca
         if (SKIP) {
             return;
         }
-        for (int i = 0; i < 10; ++i) {
-            template.sendBody("direct:start1", "PAPPO" + i);
-        }
-        stopCamelContext();
+        template.sendBody("direct:start1", "PAPPO");
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel1");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
         SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf);
         Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
         Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-        int i = 0;
-        while (reader.next(key, value)) {
-            Text txt = (Text) value;
-            assertTrue(("PAPPO" + i).equals(txt.toString()));
-            ++i;
-        }
+        reader.next(key, value);
+        assertEquals("PAPPO", value.toString());
     }
 
     @Test
@@ -83,7 +77,7 @@ public class HdfsProducerTest extends Ca
         }
         Boolean aBoolean = true;
         template.sendBody("direct:write_boolean", aBoolean);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-boolean");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -102,7 +96,7 @@ public class HdfsProducerTest extends Ca
         }
         byte aByte = 8;
         template.sendBody("direct:write_byte", aByte);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-byte");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -121,7 +115,7 @@ public class HdfsProducerTest extends Ca
         }
         int anInt = 1234;
         template.sendBody("direct:write_int", anInt);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-int");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -140,7 +134,7 @@ public class HdfsProducerTest extends Ca
         }
         float aFloat = 12.34f;
         template.sendBody("direct:write_float", aFloat);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-float");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -159,7 +153,7 @@ public class HdfsProducerTest extends Ca
         }
         Double aDouble = 12.34D;
         template.sendBody("direct:write_double", aDouble);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-double");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -178,7 +172,7 @@ public class HdfsProducerTest extends Ca
         }
         long aLong = 1234567890;
         template.sendBody("direct:write_long", aLong);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-long");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -197,7 +191,7 @@ public class HdfsProducerTest extends Ca
         }
         String txt = "CIAO MONDO !";
         template.sendBody("direct:write_text1", txt);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text1");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -217,7 +211,7 @@ public class HdfsProducerTest extends Ca
         String txtKey = "THEKEY";
         String txtValue = "CIAO MONDO !";
         template.sendBodyAndHeader("direct:write_text2", txtValue, "KEY", txtKey);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text2");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -237,7 +231,7 @@ public class HdfsProducerTest extends Ca
         String txtKey = "THEKEY";
         String txtValue = "CIAO MONDO !";
         template.sendBodyAndHeader("direct:write_text3", txtValue, "KEY", txtKey);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text3");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -256,7 +250,7 @@ public class HdfsProducerTest extends Ca
         }
         String txtValue = "CIAO MONDO !";
         template.sendBody("direct:write_text4", txtValue);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text4");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -274,7 +268,7 @@ public class HdfsProducerTest extends Ca
         String txtKey = "THEKEY";
         String txtValue = "CIAO MONDO !";
         template.sendBodyAndHeader("direct:write_text5", txtValue, "KEY", txtKey);
-        stopCamelContext();
+
         Configuration conf = new Configuration();
         Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text5");
         FileSystem fs1 = FileSystem.get(file1.toUri(), conf);
@@ -292,7 +286,8 @@ public class HdfsProducerTest extends Ca
             return;
         }
         super.tearDown();
-        Thread.sleep(100);
+
+        Thread.sleep(250);
         Configuration conf = new Configuration();
         Path dir = new Path("target/test");
         FileSystem fs = FileSystem.get(dir.toUri(), conf);

Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsAppendTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsAppendTest.java?rev=1441388&r1=1441387&r2=1441388&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsAppendTest.java
(original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsAppendTest.java
Fri Feb  1 11:15:47 2013
@@ -63,7 +63,6 @@ public class HdfsAppendTest extends Came
         for (int i = 0; i < 10; ++i) {
             template.sendBody("direct:start1", "PIPPO");
         }
-        stopCamelContext();
 
         Configuration conf = new Configuration();
         Path file = new Path("hdfs://localhost:9000/tmp/test/test-camel-simple-write-file1");
@@ -82,7 +81,8 @@ public class HdfsAppendTest extends Came
     @Override
     public void tearDown() throws Exception {
         super.tearDown();
-        Thread.sleep(100);
+
+        Thread.sleep(250);
         Configuration conf = new Configuration();
         Path dir = new Path("hdfs://localhost:9000/tmp/test");
         FileSystem fs = FileSystem.get(dir.toUri(), conf);

Modified: camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationTest.java?rev=1441388&r1=1441387&r2=1441388&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationTest.java
(original)
+++ camel/branches/camel-2.10.x/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/integration/HdfsProducerConsumerIntegrationTest.java
Fri Feb  1 11:15:47 2013
@@ -25,7 +25,6 @@ import org.apache.camel.test.junit4.Came
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
 import org.junit.After;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -69,7 +68,8 @@ public class HdfsProducerConsumerIntegra
     @After
     public void tearDown() throws Exception {
         super.tearDown();
-        Thread.sleep(100);
+
+        Thread.sleep(250);
         Configuration conf = new Configuration();
         Path dir = new Path("hdfs://localhost:9000/tmp/test");
         FileSystem fs = FileSystem.get(dir.toUri(), conf);



Mime
View raw message