flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From esam...@apache.org
Subject svn commit: r1201433 - /incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
Date Sun, 13 Nov 2011 14:51:03 GMT
Author: esammer
Date: Sun Nov 13 14:51:03 2011
New Revision: 1201433

URL: http://svn.apache.org/viewvc?rev=1201433&view=rev
Log:
FLUME-805: HDFS sink should mangle the names of incomplete files till they are closed

Modified:
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java

Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java?rev=1201433&r1=1201432&r2=1201433&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
(original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
Sun Nov 13 14:51:03 2011
@@ -22,6 +22,9 @@ import java.io.IOException;
 
 import org.apache.flume.Event;
 import org.apache.flume.sink.FlumeFormatter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 
@@ -31,6 +34,7 @@ public class BucketWriter {
     BatchStarted, BatchPending, BatchFlushed
   }
 
+  private static final String IN_USE_EXT = ".tmp";
   private HDFSWriter writer;
   private FlumeFormatter formatter;
   private long eventCounter;
@@ -45,6 +49,7 @@ public class BucketWriter {
   private long batchSize;
   private CompressionCodec codeC;
   private CompressionType compType;
+  private String bucketPath;
 
   // clear the class counters
   private void resetCounters() {
@@ -72,11 +77,13 @@ public class BucketWriter {
       throw new IOException("Invalid file settings");
     }
 
-    String bucketPath = filePath + "." + fileExentionCounter;
     if (codeC == null) {
-      writer.open(bucketPath, formatter);
+      bucketPath = filePath + "." + fileExentionCounter;
+      writer.open(bucketPath + IN_USE_EXT, formatter);
     } else {
-      writer.open(bucketPath, codeC, compType, formatter);
+      bucketPath = filePath + "." + fileExentionCounter
+          + codeC.getDefaultExtension();
+      writer.open(bucketPath + IN_USE_EXT, codeC, compType, formatter);
     }
   }
 
@@ -102,6 +109,7 @@ public class BucketWriter {
       writer.close();
       fileExentionCounter++;
     }
+    renameBucket();
   }
 
   // close the file, ignore the IOException
@@ -185,4 +193,12 @@ public class BucketWriter {
     return filePath;
   }
 
+  private void renameBucket() throws IOException {
+    Configuration conf = new Configuration();
+    Path srcPath = new Path(bucketPath + IN_USE_EXT);
+    Path dstPath = new Path(bucketPath);
+    FileSystem hdfs = dstPath.getFileSystem(conf);
+
+    hdfs.rename(srcPath, dstPath);
+  }
 }



Mime
View raw message