flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From arv...@apache.org
Subject svn commit: r1182548 - in /incubator/flume/branches/flume-728: ./ flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/ flume-ng-core/src/test/java/org/apache/flume/sink/hdfs/ flume-ng-node/ flume-ng-sinks/ flume-ng-sinks/flume-hdfs-sink/ flume-ng-si...
Date Wed, 12 Oct 2011 19:30:52 GMT
Author: arvind
Date: Wed Oct 12 19:30:51 2011
New Revision: 1182548

URL: http://svn.apache.org/viewvc?rev=1182548&view=rev
Log:
FLUME-774. Move HDFS sink into a separate module

(Prasad Mujumdar via Arvind Prabhakar)

Added:
    incubator/flume/branches/flume-728/flume-ng-sinks/
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/pom.xml
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterFactory.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
    incubator/flume/branches/flume-728/flume-ng-sinks/pom.xml
Removed:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterFactory.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
Modified:
    incubator/flume/branches/flume-728/flume-ng-node/pom.xml
    incubator/flume/branches/flume-728/pom.xml

Modified: incubator/flume/branches/flume-728/flume-ng-node/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/pom.xml?rev=1182548&r1=1182547&r2=1182548&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/pom.xml (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/pom.xml Wed Oct 12 19:30:51 2011
@@ -23,6 +23,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.flume.flume-ng-sinks</groupId>
+      <artifactId>flume-hdfs-sink</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>

Added: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/pom.xml?rev=1182548&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/pom.xml (added)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/pom.xml Wed Oct 12 19:30:51 2011
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>flume-ng-sinks</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>0.9.5-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume.flume-ng-sinks</groupId>
+  <artifactId>flume-hdfs-sink</artifactId>
+  <version>0.9.5-SNAPSHOT</version>
+  <name>Flume NG HDFS </name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+      <version>0.9.5-SNAPSHOT</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-core</artifactId>
+    </dependency>
+
+  </dependencies>
+
+</project>

Added: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java?rev=1182548&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java Wed Oct 12 19:30:51 2011
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+public class BucketWriter {
+
+  public static enum BucketFlushStatus {
+    BatchStarted, BatchPending, BatchFlushed
+  }
+
+  private HDFSWriter writer;
+  private FlumeFormatter formatter;
+  private long eventCounter;
+  private long processSize;
+  private long lastProcessTime;
+  private long fileExentionCounter;
+  private long batchCounter;
+  private String filePath;
+  private long rollInterval;
+  private long rollSize;
+  private long rollCount;
+  private long batchSize;
+  private CompressionCodec codeC;
+  private CompressionType compType;
+
+  // clear the class counters
+  private void resetCounters() {
+    eventCounter = 0;
+    processSize = 0;
+    lastProcessTime = 0;
+    batchCounter = 0;
+  }
+
+  // constructor. initialize the thresholds and open the file handle
+  public BucketWriter(long rollInt, long rollSz, long rollCnt, long bSize)
+      throws IOException {
+    rollInterval = rollInt;
+    rollSize = rollSz;
+    rollCount = rollCnt;
+    batchSize = bSize;
+
+    resetCounters();
+    fileExentionCounter = 0;
+    // open();
+  }
+
+  public void open() throws IOException {
+    if ((filePath == null) || (writer == null) || (formatter == null)) {
+      throw new IOException("Invalid file settings");
+    }
+
+    String bucketPath = filePath + "." + fileExentionCounter;
+    if (codeC == null) {
+      writer.open(bucketPath, formatter);
+    } else {
+      writer.open(bucketPath, codeC, compType, formatter);
+    }
+  }
+
+  public void open(String fPath, HDFSWriter hWriter, FlumeFormatter fmt)
+      throws IOException {
+    open(fPath, null, CompressionType.NONE, hWriter, fmt);
+  }
+
+  public void open(String fPath, CompressionCodec codec, CompressionType cType,
+      HDFSWriter hWriter, FlumeFormatter fmt) throws IOException {
+    filePath = fPath;
+    codeC = codec;
+    compType = cType;
+    writer = hWriter;
+    formatter = fmt;
+    open();
+  }
+
+  // close the file handle
+  public void close() throws IOException {
+    resetCounters();
+    if (writer != null) {
+      writer.close();
+      fileExentionCounter++;
+    }
+  }
+
+  // close the file, ignore the IOException
+  public void abort() {
+    try {
+      close();
+    } catch (IOException eIO) {
+      // Ignore it
+    }
+  }
+
+  // flush the data
+  public void flush() throws IOException {
+    writer.sync();
+    batchCounter = 0;
+  }
+
+  // handle the batching, do the real flush if its time
+  public BucketFlushStatus sync() throws IOException {
+    BucketFlushStatus syncStatus;
+
+    if ((batchCounter == batchSize)) {
+      flush();
+      syncStatus = BucketFlushStatus.BatchFlushed;
+    } else {
+      if (batchCounter == 1) {
+        syncStatus = BucketFlushStatus.BatchStarted;
+      } else {
+        syncStatus = BucketFlushStatus.BatchPending;
+      }
+    }
+    return syncStatus;
+  }
+
+  // append the data, update stats, handle roll and batching
+  public BucketFlushStatus append(Event e) throws IOException {
+    BucketFlushStatus syncStatus;
+
+    writer.append(e, formatter);
+
+    // update statistics
+    processSize += e.getBody().length;
+    lastProcessTime = System.currentTimeMillis() * 1000;
+    eventCounter++;
+    batchCounter++;
+
+    // check if its time to rotate the file
+    if (shouldRotate()) {
+      close();
+      open();
+      syncStatus = BucketFlushStatus.BatchFlushed;
+    } else {
+      syncStatus = sync();
+    }
+
+    return syncStatus;
+  }
+
+  // check if time to rotate the file
+  public boolean shouldRotate() {
+    boolean doRotate = false;
+
+    if ((rollInterval > 0)
+        && (rollInterval < (System.currentTimeMillis() - lastProcessTime) / 1000))
+      doRotate = true;
+    if ((rollCount > 0) && (rollCount < eventCounter)) {
+      eventCounter = 0;
+      doRotate = true;
+    }
+    if ((rollSize > 0) && (rollSize < processSize)) {
+      processSize = 0;
+      doRotate = true;
+    }
+
+    return doRotate;
+  }
+
+  public String getFilePath() {
+    return filePath;
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java?rev=1182548&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java Wed Oct 12 19:30:51 2011
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.DefaultCodec;
+
+public class HDFSCompressedDataStream implements HDFSWriter {
+
+  private CompressionOutputStream outStream;
+
+  @Override
+  public void open(String filePath, FlumeFormatter fmt) throws IOException {
+    DefaultCodec defCodec = new DefaultCodec();
+    CompressionType cType = CompressionType.BLOCK;
+    open(filePath, defCodec, cType, fmt);
+  }
+
+  @Override
+  public void open(String filePath, CompressionCodec codec,
+      CompressionType cType, FlumeFormatter fmt) throws IOException {
+
+    FSDataOutputStream fsOutStream;
+    Configuration conf = new Configuration();
+    Path dstPath = new Path(filePath);
+    FileSystem hdfs = dstPath.getFileSystem(conf);
+
+    if (conf.getBoolean("hdfs.append.support", false) == true) {
+      fsOutStream = hdfs.append(dstPath);
+    } else {
+      fsOutStream = hdfs.create(dstPath);
+    }
+    outStream = codec.createOutputStream(fsOutStream);
+  }
+
+  @Override
+  public void append(Event e, FlumeFormatter fmt) throws IOException {
+    byte[] bValue = fmt.getBytes(e);
+    outStream.write(bValue, 0, bValue.length);
+  }
+
+  @Override
+  public void sync() throws IOException {
+    outStream.finish();
+  }
+
+  @Override
+  public void close() throws IOException {
+    outStream.close();
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java?rev=1182548&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java Wed Oct 12 19:30:51 2011
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+public class HDFSDataStream implements HDFSWriter {
+  private FSDataOutputStream outStream;
+
+  @Override
+  public void open(String filePath, FlumeFormatter fmt) throws IOException {
+    Configuration conf = new Configuration();
+    Path dstPath = new Path(filePath);
+    FileSystem hdfs = dstPath.getFileSystem(conf);
+
+    if (conf.getBoolean("hdfs.append.support", false) == true) {
+      outStream = hdfs.append(dstPath);
+    } else {
+      outStream = hdfs.create(dstPath);
+    }
+  }
+
+  @Override
+  public void open(String filePath, CompressionCodec codec,
+      CompressionType cType, FlumeFormatter fmt) throws IOException {
+    open(filePath, fmt);
+  }
+
+  @Override
+  public void append(Event e, FlumeFormatter fmt) throws IOException {
+    byte[] bValue = fmt.getBytes(e);
+    outStream.write(bValue, 0, bValue.length);
+  }
+
+  @Override
+  public void sync() throws IOException {
+    outStream.sync();
+  }
+
+  @Override
+  public void close() throws IOException {
+    outStream.close();
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java?rev=1182548&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java Wed Oct 12 19:30:51 2011
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.PollableSink;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.formatter.output.BucketPath;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.flume.sink.hdfs.BucketWriter.BucketFlushStatus;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HDFSEventSink extends AbstractSink implements PollableSink,
+    Configurable {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(HDFSEventSink.class);
+
+  static final long defaultRollInterval = 30;
+  static final long defaultRollSize = 1024;
+  static final long defaultRollCount = 10;
+  static final String defaultFileName = "FlumeData";
+  static final String defaultBucketFormat = "%yyyy-%mm-%dd/%HH";
+  static final long defaultBatchSize = 1;
+  static final long defaultTxnEventMax = 100;
+  static final String defaultFileType = HDFSWriterFactory.SequenceFileType;
+  static final int defaultMaxOpenFiles = 5000;
+  static final String defaultWriteFormat = HDFSFormatterFactory.hdfsWritableFormat;
+
+  private long rollInterval;
+  private long rollSize;
+  private long rollCount;
+  private long txnEventMax;
+  private long batchSize;
+  private CompressionCodec codeC;
+  private CompressionType compType;
+  private String fileType;
+  private String path;
+  private int maxOpenFiles;
+  private String writeFormat;
+
+  /*
+   * Extended Java LinkedHashMap for open file handle LRU queue We want to clear
+   * the oldest file handle if there are too many open ones
+   */
+  private class WriterLinkedHashMap extends LinkedHashMap<String, BucketWriter> {
+    private static final long serialVersionUID = 1L;
+
+    protected boolean removeEldestEntry(Entry<String, BucketWriter> eldest) {
+      /*
+       * FIXME: We probably shouldn't shared state this way. Make this class
+       * private static and explicitly expose maxOpenFiles.
+       */
+      if (super.size() > maxOpenFiles) {
+        // If we have more that max open files, then close the last one and
+        // return true
+        try {
+          eldest.getValue().close();
+        } catch (IOException eI) {
+          LOG.warn(eldest.getKey().toString(), eI);
+        }
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  final WriterLinkedHashMap sfWriters = new WriterLinkedHashMap();
+
+  // Used to short-circuit around doing regex matches when we know there are
+  // no templates to be replaced.
+  // private boolean shouldSub = false;
+
+  public HDFSEventSink() {
+
+  }
+
+  // read configuration and setup thresholds
+  @Override
+  public void configure(Context context) {
+    String dirpath = context.get("hdfs.path", String.class);
+    String fileName = context.get("hdfs.filePrefix", String.class);
+    String rollInterval = context.get("hdfs.rollInterval", String.class);
+    String rollSize = context.get("hdfs.rollSize", String.class);
+    String rollCount = context.get("hdfs.rollCount", String.class);
+    String batchSize = context.get("hdfs.batchSize", String.class);
+    String txnEventMax = context.get("hdfs.txnEventMax", String.class);
+    String codecName = context.get("hdfs.codeC", String.class);
+    String fileType = context.get("hdfs.fileType", String.class);
+    String maxOpenFiles = context.get("hdfs.maxOpenFiles", String.class);
+    String writeFormat = context.get("hdfs.writeFormat", String.class);
+
+    if (fileName == null)
+      fileName = defaultFileName;
+    // FIXME: Not transportable.
+    this.path = new String(dirpath + "/" + fileName);
+
+    if (rollInterval == null) {
+      this.rollInterval = defaultRollInterval;
+    } else {
+      this.rollInterval = Long.parseLong(rollInterval);
+    }
+
+    if (rollSize == null) {
+      this.rollSize = defaultRollSize;
+    } else {
+      this.rollSize = Long.parseLong(rollSize);
+    }
+
+    if (rollCount == null) {
+      this.rollCount = defaultRollCount;
+    } else {
+      this.rollCount = Long.parseLong(rollCount);
+    }
+
+    if ((batchSize == null) || batchSize.equals("0")) {
+      this.batchSize = defaultBatchSize;
+    } else {
+      this.batchSize = Long.parseLong(batchSize);
+    }
+
+    if ((txnEventMax == null) || txnEventMax.equals("0")) {
+      this.txnEventMax = defaultTxnEventMax;
+    } else {
+      this.txnEventMax = Long.parseLong(txnEventMax);
+    }
+
+    if (codecName == null) {
+      codeC = null;
+      compType = CompressionType.NONE;
+    } else {
+      codeC = getCodec(codecName);
+      // TODO : set proper compression type
+      compType = CompressionType.BLOCK;
+    }
+
+    if (fileType == null) {
+      this.fileType = defaultFileType;
+    } else {
+      this.fileType = fileType;
+    }
+
+    if (maxOpenFiles == null) {
+      this.maxOpenFiles = defaultMaxOpenFiles;
+    } else {
+      this.maxOpenFiles = Integer.parseInt(maxOpenFiles);
+    }
+
+    if (writeFormat == null) {
+      this.writeFormat = defaultWriteFormat;
+    } else {
+      this.writeFormat = writeFormat;
+    }
+  }
+
+  private static boolean codecMatches(Class<? extends CompressionCodec> cls,
+      String codecName) {
+    String simpleName = cls.getSimpleName();
+    if (cls.getName().equals(codecName)
+        || simpleName.equalsIgnoreCase(codecName)) {
+      return true;
+    }
+    if (simpleName.endsWith("Codec")) {
+      String prefix = simpleName.substring(0,
+          simpleName.length() - "Codec".length());
+      if (prefix.equalsIgnoreCase(codecName)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static CompressionCodec getCodec(String codecName) {
+    Configuration conf = new Configuration();
+    List<Class<? extends CompressionCodec>> codecs = CompressionCodecFactory
+        .getCodecClasses(conf);
+    // Wish we could base this on DefaultCodec but appears not all codec's
+    // extend DefaultCodec(Lzo)
+    CompressionCodec codec = null;
+    ArrayList<String> codecStrs = new ArrayList<String>();
+    codecStrs.add("None");
+    for (Class<? extends CompressionCodec> cls : codecs) {
+      codecStrs.add(cls.getSimpleName());
+
+      if (codecMatches(cls, codecName)) {
+        try {
+          codec = cls.newInstance();
+        } catch (InstantiationException e) {
+          LOG.error("Unable to instantiate " + cls + " class");
+        } catch (IllegalAccessException e) {
+          LOG.error("Unable to access " + cls + " class");
+        }
+      }
+    }
+
+    if (codec == null) {
+      if (!codecName.equalsIgnoreCase("None")) {
+        throw new IllegalArgumentException("Unsupported compression codec "
+            + codecName + ".  Please choose from: " + codecStrs);
+      }
+    } else if (codec instanceof org.apache.hadoop.conf.Configurable) {
+      // Must check instanceof codec as BZip2Codec doesn't inherit Configurable
+      // Must set the configuration for Configurable objects that may or do use
+      // native libs
+      ((org.apache.hadoop.conf.Configurable) codec).setConf(conf);
+    }
+    return codec;
+  }
+
+  /**
+   * Pull events out of channel and send it to HDFS - take at the most
+   * txnEventMax, that's the maximum #events to hold in channel for a given
+   * transaction - find the corresponding bucket for the event, ensure the file
+   * is open - extract the pay-load and append to HDFS file
+   */
+  @Override
+  public Status process() throws EventDeliveryException {
+    Channel channel = getChannel();
+    Transaction transaction = channel.getTransaction();
+    Map<String, BucketWriter> batchMap = new HashMap<String, BucketWriter>();
+    BucketFlushStatus syncedUp;
+
+    try {
+      transaction.begin();
+      for (int txnEventCount = 0; txnEventCount < txnEventMax; txnEventCount++) {
+        Event event = channel.take();
+        if (event == null)
+          break;
+
+        // reconstruct the path name by substituting place holders
+        String realPath = BucketPath.escapeString(path, event.getHeaders());
+        BucketWriter bw = sfWriters.get(realPath);
+
+        // we haven't seen this file yet, so open it and cache the handle
+        if (bw == null) {
+          HDFSWriter writer = HDFSWriterFactory.getWriter(fileType);
+          FlumeFormatter formatter = HDFSFormatterFactory
+              .getFormatter(writeFormat);
+          bw = new BucketWriter(rollInterval, rollSize, rollCount, batchSize);
+          bw.open(realPath, codeC, compType, writer, formatter);
+          sfWriters.put(realPath, bw);
+        }
+
+        // Write the data to HDFS
+        syncedUp = bw.append(event);
+
+        // keep track of the files in current batch that are not flushed
+        // we need to flush all those at the end of the transaction
+        if (syncedUp == BucketFlushStatus.BatchStarted)
+          batchMap.put(bw.getFilePath(), bw);
+        else if ((batchSize > 1)
+            && (syncedUp == BucketFlushStatus.BatchFlushed))
+          batchMap.remove(bw.getFilePath());
+      }
+
+      // flush any pending writes in the given transaction
+      for (Entry<String, BucketWriter> e : batchMap.entrySet()) {
+        e.getValue().flush();
+      }
+      batchMap.clear();
+      transaction.commit();
+      return Status.READY;
+    } catch (IOException eIO) {
+      transaction.rollback();
+      LOG.error("HDFS IO error", eIO);
+      return Status.BACKOFF;
+    } catch (Exception e) {
+      transaction.rollback();
+      LOG.error("process failed", e);
+      throw new EventDeliveryException(e.getMessage());
+    } finally {
+      // clear any leftover writes in the given transaction
+      for (Entry<String, BucketWriter> e : batchMap.entrySet()) {
+        e.getValue().abort();
+      }
+      transaction.close();
+    }
+  }
+
+  @Override
+  public void stop() {
+    try {
+      for (Entry<String, BucketWriter> e : sfWriters.entrySet()) {
+        LOG.info("Closing " + e.getKey());
+        e.getValue().close();
+      }
+    } catch (IOException eIO) {
+      LOG.warn("IOException in opening file", eIO);
+    }
+    super.stop();
+  }
+
+  @Override
+  public void start() {
+    for (Entry<String, BucketWriter> e : sfWriters.entrySet()) {
+      try {
+        e.getValue().open();
+      } catch (IOException eIO) {
+        LOG.warn("IOException in opening file", eIO);
+      }
+    }
+    super.start();
+  }
+
+}
\ No newline at end of file

Added: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterFactory.java?rev=1182548&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterFactory.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterFactory.java Wed Oct 12 19:30:51 2011
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.sink.FlumeFormatter;
+
+public class HDFSFormatterFactory {
+
+  HDFSFormatterFactory() {
+
+  }
+
+  static final String hdfsWritableFormat = "Writable";
+  static final String hdfsAvroFormat = "Avro";
+  static final String hdfsTextFormat = "Text";
+
+  static FlumeFormatter getFormatter(String formatType) throws IOException {
+    if (formatType == hdfsWritableFormat) {
+      return new HDFSWritableFormatter();
+    } else if (formatType == hdfsTextFormat) {
+      return new HDFSTextFormatter();
+    } else
+      throw new IOException("Incorrect formatter type");
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java?rev=1182548&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java Wed Oct 12 19:30:51 2011
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+public class HDFSSequenceFile implements HDFSWriter {
+
+  private SequenceFile.Writer writer;
+
+  public HDFSSequenceFile() {
+    writer = null;
+  }
+
+  @Override
+  public void open(String filePath, FlumeFormatter fmt) throws IOException {
+    open(filePath, null, CompressionType.NONE, fmt);
+  }
+
+  @Override
+  public void open(String filePath, CompressionCodec codeC,
+      CompressionType compType, FlumeFormatter fmt) throws IOException {
+    Configuration conf = new Configuration();
+    Path dstPath = new Path(filePath);
+    FileSystem hdfs = dstPath.getFileSystem(conf);
+
+    if (conf.getBoolean("hdfs.append.support", false) == true) {
+      FSDataOutputStream outStream = hdfs.append(dstPath);
+      writer = SequenceFile.createWriter(conf, outStream, fmt.getKeyClass(),
+          fmt.getValueClass(), compType, codeC);
+    } else {
+      writer = SequenceFile.createWriter(hdfs, conf, dstPath,
+          fmt.getKeyClass(), fmt.getValueClass(), compType, codeC);
+    }
+  }
+
+  @Override
+  public void append(Event e, FlumeFormatter formatter) throws IOException {
+    writer.append(formatter.getKey(e), formatter.getValue(e));
+  }
+
+  @Override
+  public void sync() throws IOException {
+    writer.syncFs();
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.close();
+    writer = null;
+  }
+}

Added: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java?rev=1182548&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java Wed Oct 12 19:30:51 2011
@@ -0,0 +1,53 @@
+package org.apache.flume.sink.hdfs;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+//import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.LongWritable;
+
+
+public class HDFSTextFormatter implements FlumeFormatter {
+
+  private Text makeText(Event e) {
+    Text textObject = new Text();
+    textObject.set(e.getBody(), 0, e.getBody().length);
+    return textObject;
+  }
+
+  @Override
+  public Class<LongWritable> getKeyClass() {
+    return LongWritable.class;
+  }
+
+  @Override
+  public Class<Text> getValueClass() {
+    return Text.class;
+  }
+
+  @Override
+  public Object getKey(Event e) {
+    // Write the data to HDFS
+    String timestamp = e.getHeaders().get("timestamp");
+    long eventStamp;
+
+    if (timestamp == null) {
+      eventStamp = System.currentTimeMillis();
+    } else {
+      eventStamp = Long.valueOf(timestamp);
+    }
+    LongWritable longObject = new LongWritable(eventStamp);
+    return longObject;
+  }
+
+  @Override
+  public Object getValue(Event e) {
+    return makeText(e);
+  }
+
+  @Override
+  public byte[] getBytes(Event e) {
+    return makeText(e).getBytes();
+  }
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java?rev=1182548&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java Wed Oct 12 19:30:51 2011
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+
+public class HDFSWritableFormatter implements FlumeFormatter {
+
+  private BytesWritable makeByteWritable(Event e) {
+    BytesWritable bytesObject = new BytesWritable();
+    bytesObject.set(e.getBody(), 0, e.getBody().length);
+    return bytesObject;
+  }
+
+  @Override
+  public Class<LongWritable> getKeyClass() {
+    return LongWritable.class;
+  }
+
+  @Override
+  public Class<BytesWritable> getValueClass() {
+    return BytesWritable.class;
+  }
+
+  @Override
+  public Object getKey(Event e) {
+    // Write the data to HDFS
+    String timestamp = e.getHeaders().get("timestamp");
+    long eventStamp;
+
+    if (timestamp == null) {
+      eventStamp = System.currentTimeMillis();
+    } else {
+      eventStamp = Long.valueOf(timestamp);
+    }
+    LongWritable longObject = new LongWritable(eventStamp);
+    return longObject;
+  }
+
+  @Override
+  public Object getValue(Event e) {
+    return makeByteWritable(e);
+  }
+
+  @Override
+  public byte[] getBytes(Event e) {
+    return makeByteWritable(e).getBytes();
+  }
+}

Added: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java?rev=1182548&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java Wed Oct 12 19:30:51 2011
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+import org.apache.flume.Event;
+import org.apache.flume.sink.FlumeFormatter;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+
+public interface HDFSWriter {
+
+  public void open(String filePath, FlumeFormatter fmt) throws IOException;
+
+  // public void open(String filePath, CompressionCodec codec, CompressionType
+  // cType) throws IOException;
+
+  public void open(String filePath, CompressionCodec codec,
+      CompressionType cType, FlumeFormatter fmt) throws IOException;
+
+  // public void append(long key, byte [] val) throws IOException;
+
+  public void append(Event e, FlumeFormatter fmt) throws IOException;
+
+  public void sync() throws IOException;
+
+  public void close() throws IOException;
+
+}

Added: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java?rev=1182548&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriterFactory.java Wed Oct 12 19:30:51 2011
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+
+abstract class HDFSWriterFactory {
+  static final String SequenceFileType = "SequenceFile";
+  static final String DataStreamType = "DataStream";
+  static final String CompStreamType = "CompressedStream";
+
+  public HDFSWriterFactory() {
+
+  }
+
+  public static HDFSWriter getWriter(String fileType) throws IOException {
+    if (fileType == SequenceFileType) {
+      return new HDFSSequenceFile();
+    } else if (fileType == DataStreamType) {
+      return new HDFSDataStream();
+    } else if (fileType == CompStreamType) {
+      return new HDFSCompressedDataStream();
+    } else {
+      throw new IOException("File type " + fileType + " not supported");
+    }
+  }
+}

Added: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java?rev=1182548&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java (added)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java Wed Oct 12 19:30:51 2011
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import java.io.IOException;
+import java.util.Calendar;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.SimpleEvent;
+import org.apache.flume.lifecycle.LifecycleException;
+import org.apache.flume.sink.hdfs.HDFSEventSink;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHDFSEventSink {
+
+  private HDFSEventSink sink;
+  private String testPath;
+
+  @Before
+  public void setUp() {
+    /*
+     * FIXME: Use a dynamic path to support concurrent test execution. Also,
+     * beware of the case where this path is used for something or when the
+     * Hadoop config points at file:/// rather than hdfs://. We need to find a
+     * better way of testing HDFS related functionality.
+     */
+    testPath = "/user/flume/testdata";
+    sink = new HDFSEventSink();
+  }
+
+  @After
+  public void tearDown() {
+  }
+
+  @Test
+  public void testLifecycle() throws InterruptedException, LifecycleException {
+    Context context = new Context();
+
+    context.put("hdfs.path", testPath);
+    /*
+     * context.put("hdfs.rollInterval", String.class);
+     * context.get("hdfs.rollSize", String.class); context.get("hdfs.rollCount",
+     * String.class);
+     */
+    Configurables.configure(sink, context);
+
+    sink.start();
+    sink.stop();
+  }
+
+  @Test
+  public void testAppend() throws InterruptedException, LifecycleException,
+      EventDeliveryException, IOException {
+
+    final long txnMax = 25;
+    final long rollCount = 3;
+    final long batchSize = 2;
+    final String fileName = "FlumeData";
+
+    // clear the test directory
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    Path dirPath = new Path(testPath);
+    fs.delete(dirPath, true);
+    fs.mkdirs(dirPath);
+
+    Context context = new Context();
+
+    context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
+    context.put("hdfs.filePrefix", fileName);
+    context.put("hdfs.txnEventMax", String.valueOf(txnMax));
+    context.put("hdfs.rollCount", String.valueOf(rollCount));
+    context.put("hdfs.batchSize", String.valueOf(batchSize));
+
+    Configurables.configure(sink, context);
+
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, context);
+
+    sink.setChannel(channel);
+    sink.start();
+
+    Transaction txn = channel.getTransaction();
+
+    Calendar eventDate = Calendar.getInstance();
+
+    // push the event batches into channel
+    for (int i = 1; i < 4; i++) {
+      txn.begin();
+      for (int j = 1; j <= txnMax; j++) {
+        Event event = new SimpleEvent();
+        eventDate.clear();
+        eventDate.set(2011, i, i, i, 0); // yy mm dd
+        event.getHeaders().put("timestamp",
+            String.valueOf(eventDate.getTimeInMillis()));
+        event.getHeaders().put("hostname", "Host" + i);
+
+        event.setBody(("Test." + i + "." + j).getBytes());
+        channel.put(event);
+      }
+      txn.commit();
+
+      // execute sink to process the events
+      sink.process();
+    }
+
+    sink.stop();
+
+    /*
+     * 
+     * // loop through all the files generated and check their contains
+     * FileStatus[] dirStat = fs.listStatus(dirPath); Path fList[] =
+     * FileUtil.stat2Paths(dirStat);
+     * 
+     * try { for (int cnt = 0; cnt < fList.length; cnt++) { SequenceFile.Reader
+     * reader = new SequenceFile.Reader(fs, fList[cnt], conf); LongWritable key
+     * = new LongWritable(); BytesWritable value = new BytesWritable();
+     * 
+     * while (reader.next(key, value)) { logger.info(key+ ":" +
+     * value.toString()); } reader.close(); } } catch (IOException ioe) {
+     * System.err.println("IOException during operation: " + ioe.toString());
+     * System.exit(1); }
+     */
+  }
+
+  @Test
+  public void testTextAppend() throws InterruptedException, LifecycleException,
+      EventDeliveryException, IOException {
+
+    final long txnMax = 25;
+    final long rollCount = 3;
+    final long batchSize = 2;
+    final String fileName = "FlumeData";
+
+    // clear the test directory
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    Path dirPath = new Path(testPath);
+    fs.delete(dirPath, true);
+    fs.mkdirs(dirPath);
+
+    Context context = new Context();
+
+    context.put("hdfs.path", testPath + "/%Y-%m-%d/%H");
+    context.put("hdfs.filePrefix", fileName);
+    context.put("hdfs.txnEventMax", String.valueOf(txnMax));
+    context.put("hdfs.rollCount", String.valueOf(rollCount));
+    context.put("hdfs.batchSize", String.valueOf(batchSize));
+    context.put("hdfs.writeFormat","Text");
+    context.put("hdfs.fileType", "DataStream");
+
+    Configurables.configure(sink, context);
+
+    Channel channel = new MemoryChannel();
+    Configurables.configure(channel, context);
+
+    sink.setChannel(channel);
+    sink.start();
+
+    Transaction txn = channel.getTransaction();
+
+    Calendar eventDate = Calendar.getInstance();
+
+    // push the event batches into channel
+    for (int i = 1; i < 4; i++) {
+      txn.begin();
+      for (int j = 1; j <= txnMax; j++) {
+        Event event = new SimpleEvent();
+        eventDate.clear();
+        eventDate.set(2011, i, i, i, 0); // yy mm dd
+        event.getHeaders().put("timestamp",
+            String.valueOf(eventDate.getTimeInMillis()));
+        event.getHeaders().put("hostname", "Host" + i);
+
+        event.setBody(("Test." + i + "." + j).getBytes());
+        channel.put(event);
+      }
+      txn.commit();
+
+      // execute sink to process the events
+      sink.process();
+    }
+
+    sink.stop();
+
+    /*
+     * 
+     * // loop through all the files generated and check their contains
+     * FileStatus[] dirStat = fs.listStatus(dirPath); Path fList[] =
+     * FileUtil.stat2Paths(dirStat);
+     * 
+     * try { for (int cnt = 0; cnt < fList.length; cnt++) { SequenceFile.Reader
+     * reader = new SequenceFile.Reader(fs, fList[cnt], conf); LongWritable key
+     * = new LongWritable(); BytesWritable value = new BytesWritable();
+     * 
+     * while (reader.next(key, value)) { logger.info(key+ ":" +
+     * value.toString()); } reader.close(); } } catch (IOException ioe) {
+     * System.err.println("IOException during operation: " + ioe.toString());
+     * System.exit(1); }
+     */
+  }
+
+  
+}

Added: incubator/flume/branches/flume-728/flume-ng-sinks/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/pom.xml?rev=1182548&view=auto
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/pom.xml (added)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/pom.xml Wed Oct 12 19:30:51 2011
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>flume-parent</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>0.9.5-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.flume</groupId>
+  <artifactId>flume-ng-sinks</artifactId>
+  <version>0.9.5-SNAPSHOT</version>
+  <name>Flume NG Sinks</name>
+  <packaging>pom</packaging>
+
+  <modules>
+    <module>flume-hdfs-sink</module>
+  </modules>
+</project>

Modified: incubator/flume/branches/flume-728/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/pom.xml?rev=1182548&r1=1182547&r2=1182548&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/pom.xml (original)
+++ incubator/flume/branches/flume-728/pom.xml Wed Oct 12 19:30:51 2011
@@ -107,6 +107,7 @@ limitations under the License.
       </activation>
       <modules>
         <module>flume-ng-core</module>
+        <module>flume-ng-sinks</module>
         <module>flume-ng-node</module>
         <module>flume-ng-dist</module>
         <module>flume-ng-channels</module>



Mime
View raw message