flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject git commit: FLUME-1100. HDFSWriterFactory and HDFSFormatterFactory should allow extension.
Date Wed, 19 Dec 2012 01:22:02 GMT
Updated Branches:
  refs/heads/trunk 112e80a22 -> 0dacb250a


FLUME-1100. HDFSWriterFactory and HDFSFormatterFactory should allow extension.

(Chris Birchall via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/0dacb250
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/0dacb250
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/0dacb250

Branch: refs/heads/trunk
Commit: 0dacb250aab933abc077f8f72a616e5005a06135
Parents: 112e80a
Author: Mike Percy <mpercy@apache.org>
Authored: Tue Dec 18 17:02:08 2012 -0800
Committer: Mike Percy <mpercy@apache.org>
Committed: Tue Dec 18 17:02:08 2012 -0800

----------------------------------------------------------------------
 .../java/org/apache/flume/sink/FlumeFormatter.java |   37 ------
 .../org/apache/flume/sink/hdfs/BucketWriter.java   |   15 +--
 .../flume/sink/hdfs/HDFSCompressedDataStream.java  |    9 +-
 .../org/apache/flume/sink/hdfs/HDFSDataStream.java |    9 +-
 .../org/apache/flume/sink/hdfs/HDFSEventSink.java  |   86 ++++++--------
 .../flume/sink/hdfs/HDFSFormatterFactory.java      |   43 -------
 .../apache/flume/sink/hdfs/HDFSSequenceFile.java   |   30 +++--
 .../apache/flume/sink/hdfs/HDFSTextFormatter.java  |   36 +++---
 .../flume/sink/hdfs/HDFSWritableFormatter.java     |   32 ++++--
 .../org/apache/flume/sink/hdfs/HDFSWriter.java     |   12 +--
 .../apache/flume/sink/hdfs/SeqFileFormatter.java   |   68 +++++++++++
 .../flume/sink/hdfs/SeqFileFormatterFactory.java   |   89 +++++++++++++++
 .../flume/sink/hdfs/SeqFileFormatterType.java      |   37 ++++++
 .../apache/flume/sink/hdfs/HDFSBadDataStream.java  |    8 +-
 .../apache/flume/sink/hdfs/HDFSBadSeqWriter.java   |   11 +-
 .../org/apache/flume/sink/hdfs/MockHDFSWriter.java |    7 +-
 .../apache/flume/sink/hdfs/MyCustomFormatter.java  |   58 ++++++++++
 .../apache/flume/sink/hdfs/TestBucketWriter.java   |   30 ++---
 .../sink/hdfs/TestHDFSCompressedDataStream.java    |   14 +--
 .../sink/hdfs/TestSeqFileFormatterFactory.java     |   58 ++++++++++
 20 files changed, 450 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/0dacb250/flume-ng-core/src/main/java/org/apache/flume/sink/FlumeFormatter.java
----------------------------------------------------------------------
diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/FlumeFormatter.java b/flume-ng-core/src/main/java/org/apache/flume/sink/FlumeFormatter.java
deleted file mode 100644
index 05d60b7..0000000
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/FlumeFormatter.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink;
-
-import org.apache.flume.Event;
-
-public interface FlumeFormatter {
-
-  @SuppressWarnings("rawtypes")
-  Class getKeyClass();
-
-  @SuppressWarnings("rawtypes")
-  Class getValueClass();
-
-  Object getKey(Event e);
-
-  Object getValue(Event e);
-
-  byte[] getBytes(Event e);
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/0dacb250/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index 002d48d..f14f7cb 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -31,7 +31,6 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.SystemClock;
 import org.apache.flume.instrumentation.SinkCounter;
-import org.apache.flume.sink.FlumeFormatter;
 import org.apache.flume.sink.hdfs.HDFSEventSink.WriterCallback;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -62,7 +61,6 @@ class BucketWriter {
   private static final Integer staticLock = new Integer(1);
 
   private final HDFSWriter writer;
-  private final FlumeFormatter formatter;
   private final long rollInterval;
   private final long rollSize;
   private final long rollCount;
@@ -104,7 +102,7 @@ class BucketWriter {
   BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize,
       Context context, String filePath, String fileName, String inUsePrefix,
       String inUseSuffix, String fileSuffix, CompressionCodec codeC,
-      CompressionType compType, HDFSWriter writer, FlumeFormatter formatter,
+      CompressionType compType, HDFSWriter writer,
       ScheduledExecutorService timedRollerPool, UserGroupInformation user,
       SinkCounter sinkCounter, int idleTimeout, WriterCallback onIdleCallback,
       String onIdleCallbackPath) {
@@ -120,7 +118,6 @@ class BucketWriter {
     this.codeC = codeC;
     this.compType = compType;
     this.writer = writer;
-    this.formatter = formatter;
     this.timedRollerPool = timedRollerPool;
     this.user = user;
     this.sinkCounter = sinkCounter;
@@ -191,7 +188,7 @@ class BucketWriter {
    * @throws InterruptedException
    */
   private void doOpen() throws IOException, InterruptedException {
-    if ((filePath == null) || (writer == null) || (formatter == null)) {
+    if ((filePath == null) || (writer == null)) {
       throw new IOException("Invalid file settings");
     }
 
@@ -229,11 +226,13 @@ class BucketWriter {
           // Need to get reference to FS using above config before underlying
           // writer does in order to avoid shutdown hook & IllegalStateExceptions
           fileSystem = new Path(bucketPath).getFileSystem(config);
-          writer.open(bucketPath, formatter);
+          LOG.info("Creating " + bucketPath);
+          writer.open(bucketPath);
         } else {
           // need to get reference to FS before writer does to avoid shutdown hook
           fileSystem = new Path(bucketPath).getFileSystem(config);
-          writer.open(bucketPath, codeC, compType, formatter);
+          LOG.info("Creating " + bucketPath);
+          writer.open(bucketPath, codeC, compType);
         }
       } catch (Exception ex) {
         sinkCounter.incrementConnectionFailedCount();
@@ -403,7 +402,7 @@ class BucketWriter {
     // write the event
     try {
       sinkCounter.incrementEventDrainAttemptCount();
-      writer.append(event, formatter); // could block
+      writer.append(event); // could block
     } catch (IOException e) {
       LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" +
           bucketPath + ") and rethrowing exception.",

http://git-wip-us.apache.org/repos/asf/flume/blob/0dacb250/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
index afcd9d6..18fe6d4 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
@@ -23,7 +23,6 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.serialization.EventSerializer;
 import org.apache.flume.serialization.EventSerializerFactory;
-import org.apache.flume.sink.FlumeFormatter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -56,15 +55,15 @@ public class HDFSCompressedDataStream implements HDFSWriter {
   }
 
   @Override
-  public void open(String filePath, FlumeFormatter fmt) throws IOException {
+  public void open(String filePath) throws IOException {
     DefaultCodec defCodec = new DefaultCodec();
     CompressionType cType = CompressionType.BLOCK;
-    open(filePath, defCodec, cType, fmt);
+    open(filePath, defCodec, cType);
   }
 
   @Override
   public void open(String filePath, CompressionCodec codec,
-      CompressionType cType, FlumeFormatter fmt) throws IOException {
+      CompressionType cType) throws IOException {
     Configuration conf = new Configuration();
     Path dstPath = new Path(filePath);
     FileSystem hdfs = dstPath.getFileSystem(conf);
@@ -95,7 +94,7 @@ public class HDFSCompressedDataStream implements HDFSWriter {
   }
 
   @Override
-  public void append(Event e, FlumeFormatter fmt) throws IOException {
+  public void append(Event e) throws IOException {
     if (isFinished) {
       cmpOut.resetState();
       isFinished = false;

http://git-wip-us.apache.org/repos/asf/flume/blob/0dacb250/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
index 2ac06d7..bd40a88 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java
@@ -23,7 +23,6 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.serialization.EventSerializer;
 import org.apache.flume.serialization.EventSerializerFactory;
-import org.apache.flume.sink.FlumeFormatter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -46,7 +45,7 @@ public class HDFSDataStream implements HDFSWriter {
   }
 
   @Override
-  public void open(String filePath, FlumeFormatter fmt) throws IOException {
+  public void open(String filePath) throws IOException {
     Configuration conf = new Configuration();
     Path dstPath = new Path(filePath);
     FileSystem hdfs = dstPath.getFileSystem(conf);
@@ -78,12 +77,12 @@ public class HDFSDataStream implements HDFSWriter {
 
   @Override
   public void open(String filePath, CompressionCodec codec,
-      CompressionType cType, FlumeFormatter fmt) throws IOException {
-    open(filePath, fmt);
+                   CompressionType cType) throws IOException {
+    open(filePath);
   }
 
   @Override
-  public void append(Event e, FlumeFormatter fmt) throws IOException {
+  public void append(Event e) throws IOException {
     // shun flumeformatter...
     serializer.write(e);
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/0dacb250/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index 230488e..e980d13 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -46,7 +46,6 @@ import org.apache.flume.conf.Configurable;
 import org.apache.flume.formatter.output.BucketPath;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
-import org.apache.flume.sink.FlumeFormatter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -124,7 +123,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
   private String inUseSuffix;
   private TimeZone timeZone;
   private int maxOpenFiles;
-  private String writeFormat;
   private ExecutorService callTimeoutPool;
   private ScheduledExecutorService timedRollerPool;
 
@@ -185,7 +183,8 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
     this.writerFactory = writerFactory;
   }
 
-    // read configuration and setup thresholds
+  // read configuration and setup thresholds
+  @Override
   public void configure(Context context) {
     this.context = context;
 
@@ -205,7 +204,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
     String codecName = context.getString("hdfs.codeC");
     fileType = context.getString("hdfs.fileType", defaultFileType);
     maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles);
-    writeFormat = context.getString("hdfs.writeFormat");
     callTimeout = context.getLong("hdfs.callTimeout", defaultCallTimeout);
     threadsPoolSize = context.getInteger("hdfs.threadsPoolSize",
         defaultThreadPoolSize);
@@ -240,18 +238,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
           + " when fileType is: " + fileType);
     }
 
-    if (writeFormat == null) {
-      // Default write formatter is chosen by requested file type
-      if(fileType.equalsIgnoreCase(HDFSWriterFactory.DataStreamType)
-         || fileType.equalsIgnoreCase(HDFSWriterFactory.CompStreamType)) {
-        // Output is written into text files, by default separate events by \n
-        this.writeFormat = HDFSFormatterFactory.hdfsTextFormat;
-      } else {
-        // Output is written into binary files, so use binary writable format
-        this.writeFormat = HDFSFormatterFactory.hdfsWritableFormat;
-      }
-    }
-
     if (!authenticate()) {
       LOG.error("Failed to authenticate!");
     }
@@ -381,6 +367,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
       throw ex;
     }
   }
+
   /**
    * Pull events out of channel and send it to HDFS. Take at most batchSize
    * events per Transaction. Find the corresponding bucket for the event.
@@ -413,8 +400,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
         // we haven't seen this file yet, so open it and cache the handle
         if (bucketWriter == null) {
           HDFSWriter hdfsWriter = writerFactory.getWriter(fileType);
-          FlumeFormatter formatter = HDFSFormatterFactory
-              .getFormatter(writeFormat);
 
           WriterCallback idleCallback = null;
           if(idleTimeout != 0) {
@@ -427,7 +412,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
           }
           bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount,
               batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
-              suffix, codeC, compType, hdfsWriter, formatter, timedRollerPool,
+              suffix, codeC, compType, hdfsWriter, timedRollerPool,
               proxyTicket, sinkCounter, idleTimeout, idleCallback, lookupPath);
 
           sfWriters.put(lookupPath, bucketWriter);
@@ -490,7 +475,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
         close(entry.getValue());
       } catch (Exception ex) {
         LOG.warn("Exception while closing " + entry.getKey() + ". " +
-            "Exception follows.", ex);
+                "Exception follows.", ex);
         if (ex instanceof InterruptedException) {
           Thread.currentThread().interrupt();
         }
@@ -498,13 +483,13 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
     }
 
     // shut down all our thread pools
-    ExecutorService toShutdown[] = { callTimeoutPool, timedRollerPool };
+    ExecutorService toShutdown[] = {callTimeoutPool, timedRollerPool};
     for (ExecutorService execService : toShutdown) {
       execService.shutdown();
       try {
         while (execService.isTerminated() == false) {
           execService.awaitTermination(
-              Math.max(defaultCallTimeout, callTimeout), TimeUnit.MILLISECONDS);
+                  Math.max(defaultCallTimeout, callTimeout), TimeUnit.MILLISECONDS);
         }
       } catch (InterruptedException ex) {
         LOG.warn("shutdown interrupted on " + execService, ex);
@@ -524,11 +509,11 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
   public void start() {
     String timeoutName = "hdfs-" + getName() + "-call-runner-%d";
     callTimeoutPool = Executors.newFixedThreadPool(threadsPoolSize,
-        new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
+            new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
 
     String rollerName = "hdfs-" + getName() + "-roll-timer-%d";
     timedRollerPool = Executors.newScheduledThreadPool(rollTimerPoolSize,
-        new ThreadFactoryBuilder().setNameFormat(rollerName).build());
+            new ThreadFactoryBuilder().setNameFormat(rollerName).build());
 
     this.sfWriters = new WriterLinkedHashMap(maxOpenFiles);
     sinkCounter.start();
@@ -547,12 +532,12 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
       // sanity checking
       if (kerbConfPrincipal.isEmpty()) {
         LOG.error("Hadoop running in secure mode, but Flume config doesn't "
-            + "specify a principal to use for Kerberos auth.");
+                + "specify a principal to use for Kerberos auth.");
         return false;
       }
       if (kerbKeytab.isEmpty()) {
         LOG.error("Hadoop running in secure mode, but Flume config doesn't "
-            + "specify a keytab to use for Kerberos auth.");
+                + "specify a keytab to use for Kerberos auth.");
         return false;
       } else {
         //If keytab is specified, user should want it take effect.
@@ -560,8 +545,8 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
         File kfile = new File(kerbKeytab);
         if (!(kfile.isFile() && kfile.canRead())) {
           throw new IllegalArgumentException("The keyTab file: "
-              + kerbKeytab + " is nonexistent or can't read. "
-              + "Please specify a readable keytab file for Kerberos auth.");
+                  + kerbKeytab + " is nonexistent or can't read. "
+                  + "Please specify a readable keytab file for Kerberos auth.");
         }
       }
 
@@ -572,7 +557,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
         principal = SecurityUtil.getServerPrincipal(kerbConfPrincipal, "");
       } catch (IOException e) {
         LOG.error("Host lookup error resolving kerberos principal ("
-            + kerbConfPrincipal + "). Exception follows.", e);
+                + kerbConfPrincipal + "). Exception follows.", e);
         return false;
       }
 
@@ -587,9 +572,9 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
       // since we don't have to be unnecessarily protective if they switch all
       // HDFS sinks to use a different principal all at once.
       Preconditions.checkState(prevUser == null || prevUser.equals(newUser),
-          "Cannot use multiple kerberos principals in the same agent. " +
-          " Must restart agent to use new principal or keytab. " +
-          "Previous = %s, New = %s", prevUser, newUser);
+              "Cannot use multiple kerberos principals in the same agent. " +
+                      " Must restart agent to use new principal or keytab. " +
+                      "Previous = %s, New = %s", prevUser, newUser);
 
       // attempt to use cached credential if the user is the same
       // this is polite and should avoid flooding the KDC with auth requests
@@ -599,7 +584,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
           curUser = UserGroupInformation.getLoginUser();
         } catch (IOException e) {
           LOG.warn("User unexpectedly had no active login. Continuing with " +
-              "authentication", e);
+                  "authentication", e);
         }
       }
 
@@ -609,8 +594,8 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
           kerberosLogin(this, principal, kerbKeytab);
         } catch (IOException e) {
           LOG.error("Authentication or file read error while attempting to "
-              + "login as kerberos principal (" + principal + ") using "
-              + "keytab (" + kerbKeytab + "). Exception follows.", e);
+                  + "login as kerberos principal (" + principal + ") using "
+                  + "keytab (" + kerbKeytab + "). Exception follows.", e);
           return false;
         }
       } else {
@@ -626,7 +611,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
     if (!proxyUserName.isEmpty()) {
       try {
         proxyTicket = UserGroupInformation.createProxyUser(
-            proxyUserName, UserGroupInformation.getLoginUser());
+                proxyUserName, UserGroupInformation.getLoginUser());
       } catch (IOException e) {
         LOG.error("Unable to login as proxy user. Exception follows.", e);
         return false;
@@ -641,7 +626,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
         ugi = UserGroupInformation.getLoginUser();
       } catch (IOException e) {
         LOG.error("Unexpected error: Unable to get authenticated user after " +
-            "apparent successful login! Exception follows.", e);
+                "apparent successful login! Exception follows.", e);
         return false;
       }
     }
@@ -661,7 +646,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
           LOG.info(" Superuser using keytab: {}", superUser.isFromKeytab());
         } catch (IOException e) {
           LOG.error("Unexpected error: unknown superuser impersonating proxy.",
-              e);
+                  e);
           return false;
         }
       }
@@ -685,13 +670,16 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
    * In addition, since the underlying Hadoop APIs we are using for
    * impersonation are static, we define this method as static as well.
    *
-   * @param principal Fully-qualified principal to use for authentication.
-   * @param keytab Location of keytab file containing credentials for principal.
+   * @param principal
+   *         Fully-qualified principal to use for authentication.
+   * @param keytab
+   *         Location of keytab file containing credentials for principal.
    * @return Logged-in user
-   * @throws IOException if login fails.
+   * @throws IOException
+   *         if login fails.
    */
   private static synchronized UserGroupInformation kerberosLogin(
-      HDFSEventSink sink, String principal, String keytab) throws IOException {
+          HDFSEventSink sink, String principal, String keytab) throws IOException {
 
     // if we are the 2nd user thru the lock, the login should already be
     // available statically if login was successful
@@ -707,13 +695,13 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
     // we already have logged in successfully
     if (curUser != null && curUser.getUserName().equals(principal)) {
       LOG.debug("{}: Using existing principal ({}): {}",
-          new Object[] { sink, principal, curUser });
+              new Object[]{sink, principal, curUser});
 
-    // no principal found
+      // no principal found
     } else {
 
       LOG.info("{}: Attempting kerberos login as principal ({}) from keytab " +
-          "file ({})", new Object[] { sink, principal, keytab });
+              "file ({})", new Object[]{sink, principal, keytab});
 
       // attempt static kerberos login
       UserGroupInformation.loginUserFromKeytab(principal, keytab);
@@ -726,14 +714,14 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
   @Override
   public String toString() {
     return "{ Sink type:" + getClass().getSimpleName() + ", name:" + getName() +
-        " }";
+            " }";
   }
 
   /**
    * Append to bucket writer with timeout enforced
    */
   private void append(final BucketWriter bucketWriter, final Event event)
-      throws IOException, InterruptedException {
+          throws IOException, InterruptedException {
 
     // Write the data to HDFS
     callWithTimeout(new Callable<Void>() {
@@ -748,7 +736,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
    * Flush bucket writer with timeout enforced
    */
   private void flush(final BucketWriter bucketWriter)
-      throws IOException, InterruptedException {
+          throws IOException, InterruptedException {
 
     callWithTimeout(new Callable<Void>() {
       public Void call() throws Exception {
@@ -762,7 +750,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable {
    * Close bucket writer with timeout enforced
    */
   private void close(final BucketWriter bucketWriter)
-      throws IOException, InterruptedException {
+          throws IOException, InterruptedException {
 
     callWithTimeout(new Callable<Void>() {
       public Void call() throws Exception {

http://git-wip-us.apache.org/repos/asf/flume/blob/0dacb250/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterFactory.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterFactory.java
deleted file mode 100644
index 7b47942..0000000
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.hdfs;
-
-import java.io.IOException;
-
-import org.apache.flume.sink.FlumeFormatter;
-
-public class HDFSFormatterFactory {
-
-  HDFSFormatterFactory() {
-
-  }
-
-  static final String hdfsWritableFormat = "Writable";
-  static final String hdfsTextFormat = "Text";
-
-  static FlumeFormatter getFormatter(String formatType) throws IOException {
-    if (formatType.equalsIgnoreCase(hdfsWritableFormat)) {
-      return new HDFSWritableFormatter();
-    } else if (formatType.equalsIgnoreCase(hdfsTextFormat)) {
-      return new HDFSTextFormatter();
-    } else {
-      throw new IOException("Incorrect formatter type: " + formatType);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/0dacb250/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
index bcc6f20..1e6d68f 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
@@ -20,9 +20,7 @@ package org.apache.flume.sink.hdfs;
 
 import java.io.IOException;
 import org.apache.flume.Context;
-
 import org.apache.flume.Event;
-import org.apache.flume.sink.FlumeFormatter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -34,6 +32,9 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 public class HDFSSequenceFile implements HDFSWriter {
 
   private SequenceFile.Writer writer;
+  private String writeFormat;
+  private Context serializerContext;
+  private SeqFileFormatter formatter;
 
   public HDFSSequenceFile() {
     writer = null;
@@ -41,17 +42,22 @@ public class HDFSSequenceFile implements HDFSWriter {
 
   @Override
   public void configure(Context context) {
-    // no-op
+    // use binary writable format by default
+    writeFormat = context.getString("hdfs.writeFormat", SeqFileFormatterType.Writable.name());
+    serializerContext = new Context(
+            context.getSubProperties(SeqFileFormatterFactory.CTX_PREFIX));
+    formatter = SeqFileFormatterFactory
+            .getFormatter(writeFormat, serializerContext);
   }
 
   @Override
-  public void open(String filePath, FlumeFormatter fmt) throws IOException {
-    open(filePath, null, CompressionType.NONE, fmt);
+  public void open(String filePath) throws IOException {
+    open(filePath, null, CompressionType.NONE);
   }
 
   @Override
   public void open(String filePath, CompressionCodec codeC,
-      CompressionType compType, FlumeFormatter fmt) throws IOException {
+      CompressionType compType) throws IOException {
     Configuration conf = new Configuration();
     Path dstPath = new Path(filePath);
     FileSystem hdfs = dstPath.getFileSystem(conf);
@@ -59,17 +65,19 @@ public class HDFSSequenceFile implements HDFSWriter {
     if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile
             (dstPath)) {
       FSDataOutputStream outStream = hdfs.append(dstPath);
-      writer = SequenceFile.createWriter(conf, outStream, fmt.getKeyClass(),
-          fmt.getValueClass(), compType, codeC);
+      writer = SequenceFile.createWriter(conf, outStream, formatter.getKeyClass(),
+          formatter.getValueClass(), compType, codeC);
     } else {
       writer = SequenceFile.createWriter(hdfs, conf, dstPath,
-          fmt.getKeyClass(), fmt.getValueClass(), compType, codeC);
+          formatter.getKeyClass(), formatter.getValueClass(), compType, codeC);
     }
   }
 
   @Override
-  public void append(Event e, FlumeFormatter formatter) throws IOException {
-    writer.append(formatter.getKey(e), formatter.getValue(e));
+  public void append(Event e) throws IOException {
+    for (SeqFileFormatter.Record record : formatter.format(e)) {
+      writer.append(record.getKey(), record.getValue());
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/flume/blob/0dacb250/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java
index 5839dbb..4b39f5d 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java
@@ -19,16 +19,13 @@
 
 package org.apache.flume.sink.hdfs;
 
-import java.util.Arrays;
-
+import java.util.Collections;
+import org.apache.flume.Context;
 import org.apache.flume.Event;
-import org.apache.flume.sink.FlumeFormatter;
-//import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.LongWritable;
 
-
-public class HDFSTextFormatter implements FlumeFormatter {
+public class HDFSTextFormatter implements SeqFileFormatter {
 
   private Text makeText(Event e) {
     Text textObject = new Text();
@@ -47,7 +44,13 @@ public class HDFSTextFormatter implements FlumeFormatter {
   }
 
   @Override
-  public Object getKey(Event e) {
+  public Iterable<Record> format(Event e) {
+    Object key = getKey(e);
+    Object value = getValue(e);
+    return Collections.singletonList(new Record(key, value));
+  }
+
+  private Object getKey(Event e) {
     // Write the data to HDFS
     String timestamp = e.getHeaders().get("timestamp");
     long eventStamp;
@@ -57,21 +60,20 @@ public class HDFSTextFormatter implements FlumeFormatter {
     } else {
       eventStamp = Long.valueOf(timestamp);
     }
-    LongWritable longObject = new LongWritable(eventStamp);
-    return longObject;
+    return new LongWritable(eventStamp);
   }
 
-  @Override
-  public Object getValue(Event e) {
+  private Object getValue(Event e) {
     return makeText(e);
   }
 
-  @Override
-  public byte[] getBytes(Event e) {
-    Text record = makeText(e);
-    record.append("\n".getBytes(), 0, 1);
-    byte[] rawBytes = record.getBytes();
-    return Arrays.copyOf(rawBytes, record.getLength());
+  public static class Builder implements SeqFileFormatter.Builder {
+
+    @Override
+    public SeqFileFormatter build(Context context) {
+      return new HDFSTextFormatter();
+    }
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/0dacb250/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java
index 9f03389..cece506 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java
@@ -17,12 +17,14 @@
  */
 package org.apache.flume.sink.hdfs;
 
+import org.apache.flume.Context;
 import org.apache.flume.Event;
-import org.apache.flume.sink.FlumeFormatter;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 
-public class HDFSWritableFormatter implements FlumeFormatter {
+import java.util.Collections;
+
+public class HDFSWritableFormatter implements SeqFileFormatter {
 
   private BytesWritable makeByteWritable(Event e) {
     BytesWritable bytesObject = new BytesWritable();
@@ -41,8 +43,13 @@ public class HDFSWritableFormatter implements FlumeFormatter {
   }
 
   @Override
-  public Object getKey(Event e) {
-    // Write the data to HDFS
+  public Iterable<Record> format(Event e) {
+    Object key = getKey(e);
+    Object value = getValue(e);
+    return Collections.singletonList(new Record(key, value));
+  }
+
+  private Object getKey(Event e) {
     String timestamp = e.getHeaders().get("timestamp");
     long eventStamp;
 
@@ -51,17 +58,20 @@ public class HDFSWritableFormatter implements FlumeFormatter {
     } else {
       eventStamp = Long.valueOf(timestamp);
     }
-    LongWritable longObject = new LongWritable(eventStamp);
-    return longObject;
+    return new LongWritable(eventStamp);
   }
 
-  @Override
-  public Object getValue(Event e) {
+  private Object getValue(Event e) {
     return makeByteWritable(e);
   }
 
-  @Override
-  public byte[] getBytes(Event e) {
-    return makeByteWritable(e).getBytes();
+  public static class Builder implements SeqFileFormatter.Builder {
+
+    @Override
+    public SeqFileFormatter build(Context context) {
+      return new HDFSWritableFormatter();
+    }
+
   }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/0dacb250/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java
index 6408e9b..abca21f 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWriter.java
@@ -22,23 +22,17 @@ import java.io.IOException;
 
 import org.apache.flume.Event;
 import org.apache.flume.conf.Configurable;
-import org.apache.flume.sink.FlumeFormatter;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 
 public interface HDFSWriter extends Configurable {
 
-  public void open(String filePath, FlumeFormatter fmt) throws IOException;
-
-  // public void open(String filePath, CompressionCodec codec, CompressionType
-  // cType) throws IOException;
+  public void open(String filePath) throws IOException;
 
   public void open(String filePath, CompressionCodec codec,
-      CompressionType cType, FlumeFormatter fmt) throws IOException;
-
-  // public void append(long key, byte [] val) throws IOException;
+      CompressionType cType) throws IOException;
 
-  public void append(Event e, FlumeFormatter fmt) throws IOException;
+  public void append(Event e) throws IOException;
 
   public void sync() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/0dacb250/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatter.java
new file mode 100644
index 0000000..c25931c
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatter.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+
+public interface SeqFileFormatter {
+
+  Class<?> getKeyClass();
+
+  Class<?> getValueClass();
+
+  /**
+   * Format the given event into zero, one or more SequenceFile records
+   *
+   * @param e
+   *         event
+   * @return a list of records corresponding to the given event
+   */
+  Iterable<Record> format(Event e);
+
+  /**
+   * Knows how to construct this output formatter.<br/>
+   * <b>Note: Implementations MUST provide a public a no-arg constructor.</b>
+   */
+  public interface Builder {
+    public SeqFileFormatter build(Context context);
+  }
+
+  /**
+   * A key-value pair making up a record in an HDFS SequenceFile
+   */
+  public static class Record {
+    private final Object key;
+    private final Object value;
+
+    public Record(Object key, Object value) {
+      this.key = key;
+      this.value = value;
+    }
+
+    public Object getKey() {
+      return key;
+    }
+
+    public Object getValue() {
+      return value;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/0dacb250/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterFactory.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterFactory.java
new file mode 100644
index 0000000..20409ba
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterFactory.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import com.google.common.base.Preconditions;
+import org.apache.flume.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SeqFileFormatterFactory {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(SeqFileFormatterFactory.class);
+
+  /**
+   * {@link Context} prefix
+   */
+  static final String CTX_PREFIX = "writeFormat.";
+
+  @SuppressWarnings("unchecked")
+  static SeqFileFormatter getFormatter(String formatType, Context context) {
+
+    Preconditions.checkNotNull(formatType,
+        "format type must not be null");
+
+    // try to find builder class in enum of known formatters
+    SeqFileFormatterType type;
+    try {
+      type = SeqFileFormatterType.valueOf(formatType);
+    } catch (IllegalArgumentException e) {
+      logger.debug("Not in enum, loading builder class: {}", formatType);
+      type = SeqFileFormatterType.Other;
+    }
+    Class<? extends SeqFileFormatter.Builder> builderClass =
+        type.getBuilderClass();
+
+    // handle the case where they have specified their own builder in the config
+    if (builderClass == null) {
+      try {
+        Class c = Class.forName(formatType);
+        if (c != null && SeqFileFormatter.Builder.class.isAssignableFrom(c)) {
+          builderClass = (Class<? extends SeqFileFormatter.Builder>) c;
+        } else {
+          logger.error("Unable to instantiate Builder from {}", formatType);
+          return null;
+        }
+      } catch (ClassNotFoundException ex) {
+        logger.error("Class not found: " + formatType, ex);
+        return null;
+      } catch (ClassCastException ex) {
+        logger.error("Class does not extend " +
+            SeqFileFormatter.Builder.class.getCanonicalName() + ": " +
+            formatType, ex);
+        return null;
+      }
+    }
+
+    // build the builder
+    SeqFileFormatter.Builder builder;
+    try {
+      builder = builderClass.newInstance();
+    } catch (InstantiationException ex) {
+      logger.error("Cannot instantiate builder: " + formatType, ex);
+      return null;
+    } catch (IllegalAccessException ex) {
+      logger.error("Cannot instantiate builder: " + formatType, ex);
+      return null;
+    }
+
+    return builder.build(context);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/0dacb250/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterType.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterType.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterType.java
new file mode 100644
index 0000000..ff3eb84
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterType.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+public enum SeqFileFormatterType {
+  Writable(HDFSWritableFormatter.Builder.class),
+  Text(HDFSTextFormatter.Builder.class),
+  Other(null);
+
+  private final Class<? extends SeqFileFormatter.Builder> builderClass;
+
+  SeqFileFormatterType(Class<? extends SeqFileFormatter.Builder> builderClass) {
+    this.builderClass = builderClass;
+  }
+
+  public Class<? extends SeqFileFormatter.Builder> getBuilderClass() {
+    return builderClass;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flume/blob/0dacb250/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java
index 423f7c1..d325233 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadDataStream.java
@@ -20,16 +20,12 @@
 package org.apache.flume.sink.hdfs;
 
 import java.io.IOException;
-
 import org.apache.flume.Event;
-import org.apache.flume.sink.FlumeFormatter;
-import org.apache.flume.sink.hdfs.HDFSSequenceFile;
-
 
 public class HDFSBadDataStream extends HDFSDataStream {
   public class HDFSBadSeqWriter extends HDFSSequenceFile {
     @Override
-    public void append(Event e, FlumeFormatter fmt) throws IOException {
+    public void append(Event e) throws IOException {
 
       if (e.getHeaders().containsKey("fault")) {
         throw new IOException("Injected fault");
@@ -41,7 +37,7 @@ public class HDFSBadDataStream extends HDFSDataStream {
           throw new IOException("append interrupted", eT);
         }
       }
-      super.append(e, fmt);
+      super.append(e);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/0dacb250/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java
index a911ab7..63ab5af 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSBadSeqWriter.java
@@ -19,27 +19,26 @@
 
 package org.apache.flume.sink.hdfs;
 
-
 import java.io.IOException;
 
 import org.apache.flume.Event;
-import org.apache.flume.sink.FlumeFormatter;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 
 public class HDFSBadSeqWriter extends HDFSSequenceFile {
   protected volatile boolean closed, opened;
+
   @Override
   public void open(String filePath, CompressionCodec codeC,
-      CompressionType compType, FlumeFormatter fmt) throws IOException {
-    super.open(filePath, codeC, compType, fmt);
+      CompressionType compType) throws IOException {
+    super.open(filePath, codeC, compType);
     if(closed) {
       opened = true;
     }
   }
 
   @Override
-  public void append(Event e, FlumeFormatter fmt) throws IOException {
+  public void append(Event e) throws IOException {
 
     if (e.getHeaders().containsKey("fault")) {
       throw new IOException("Injected fault");
@@ -59,7 +58,7 @@ public class HDFSBadSeqWriter extends HDFSSequenceFile {
       }
     }
 
-    super.append(e, fmt);
+    super.append(e);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/flume/blob/0dacb250/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
index 0b7910a..5e8628b 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java
@@ -21,7 +21,6 @@ package org.apache.flume.sink.hdfs;
 import java.io.IOException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
-import org.apache.flume.sink.FlumeFormatter;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 
@@ -64,17 +63,17 @@ public class MockHDFSWriter implements HDFSWriter {
     // no-op
   }
 
-  public void open(String filePath, FlumeFormatter fmt) throws IOException {
+  public void open(String filePath) throws IOException {
     this.filePath = filePath;
     filesOpened++;
   }
 
-  public void open(String filePath, CompressionCodec codec, CompressionType cType, FlumeFormatter fmt) throws IOException {
+  public void open(String filePath, CompressionCodec codec, CompressionType cType) throws IOException {
     this.filePath = filePath;
     filesOpened++;
   }
 
-  public void append(Event e, FlumeFormatter fmt) throws IOException {
+  public void append(Event e) throws IOException {
     eventsWritten++;
     bytesWritten += e.getBody().length;
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/0dacb250/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomFormatter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomFormatter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomFormatter.java
new file mode 100644
index 0000000..ab1e463
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomFormatter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.util.Arrays;
+
+public class MyCustomFormatter implements SeqFileFormatter {
+
+  @Override
+  public Class<LongWritable> getKeyClass() {
+    return LongWritable.class;
+  }
+
+  @Override
+  public Class<BytesWritable> getValueClass() {
+    return BytesWritable.class;
+  }
+
+  @Override
+  public Iterable<Record> format(Event e) {
+    return Arrays.asList(
+        new Record(new LongWritable(1234L), new BytesWritable(new byte[10])),
+        new Record(new LongWritable(4567L), new BytesWritable(new byte[20]))
+    );
+  }
+
+  public static class Builder implements SeqFileFormatter.Builder {
+
+    @Override
+    public SeqFileFormatter build(Context context) {
+      return new MyCustomFormatter();
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/0dacb250/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
index 5e5d5d1..829d7e8 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flume.sink.hdfs;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -28,7 +29,6 @@ import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
 import org.apache.flume.instrumentation.SinkCounter;
-import org.apache.flume.sink.FlumeFormatter;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -40,7 +40,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Charsets;
-import java.io.File;
 
 public class TestBucketWriter {
 
@@ -66,10 +65,9 @@ public class TestBucketWriter {
   public void testEventCountingRoller() throws IOException, InterruptedException {
     int maxEvents = 100;
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    HDFSTextFormatter formatter = new HDFSTextFormatter();
     BucketWriter bucketWriter = new BucketWriter(0, 0, maxEvents, 0, ctx,
         "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
-        hdfsWriter, formatter, timedRollerPool, null,
+        hdfsWriter, timedRollerPool, null,
         new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0,
         null, null);
 
@@ -91,10 +89,9 @@ public class TestBucketWriter {
   public void testSizeRoller() throws IOException, InterruptedException {
     int maxBytes = 300;
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    HDFSTextFormatter formatter = new HDFSTextFormatter();
     BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0, ctx,
         "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
-        hdfsWriter, formatter, timedRollerPool, null,
+        hdfsWriter, timedRollerPool, null,
         new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
         0, null, null);
 
@@ -118,10 +115,9 @@ public class TestBucketWriter {
     final int NUM_EVENTS = 10;
 
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    HDFSTextFormatter formatter = new HDFSTextFormatter();
     BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
         "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE,
-        hdfsWriter, formatter, timedRollerPool, null,
+        hdfsWriter, timedRollerPool, null,
         new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
         0, null, null);
 
@@ -182,11 +178,11 @@ public class TestBucketWriter {
       }
 
       public void open(String filePath, CompressionCodec codec,
-          CompressionType cType, FlumeFormatter fmt) throws IOException {
+          CompressionType cType) throws IOException {
         open = true;
       }
 
-      public void open(String filePath, FlumeFormatter fmt) throws IOException {
+      public void open(String filePath) throws IOException {
         open = true;
       }
 
@@ -194,7 +190,7 @@ public class TestBucketWriter {
         open = false;
       }
 
-      public void append(Event e, FlumeFormatter fmt) throws IOException {
+      public void append(Event e) throws IOException {
         // we just re-open in append if closed
         open = true;
       }
@@ -207,7 +203,7 @@ public class TestBucketWriter {
 
     BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
         path, name, "", ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
-        formatter, timedRollerPool, null,
+        timedRollerPool, null,
         new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
         0, null, null);
 
@@ -228,10 +224,9 @@ public class TestBucketWriter {
       final String suffix = null;
 
       MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-      HDFSTextFormatter formatter = new HDFSTextFormatter();
       BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
           "/tmp", "file", "", ".tmp", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter,
-          formatter, timedRollerPool, null,
+          timedRollerPool, null,
           new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
           0, null, null);
 
@@ -256,10 +251,9 @@ public class TestBucketWriter {
         final String suffix = ".avro";
 
         MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-        HDFSTextFormatter formatter = new HDFSTextFormatter();
         BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
             "/tmp", "file", "", ".tmp", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter,
-            formatter, timedRollerPool, null,
+            timedRollerPool, null,
             new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
             0, null, null);
 
@@ -289,7 +283,7 @@ public class TestBucketWriter {
     HDFSTextFormatter formatter = new HDFSTextFormatter();
     BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
         "/tmp", "file", PREFIX, ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
-        formatter, timedRollerPool, null,
+        timedRollerPool, null,
         new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
         0, null, null);
 
@@ -308,7 +302,7 @@ public class TestBucketWriter {
     HDFSTextFormatter formatter = new HDFSTextFormatter();
     BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
         "/tmp", "file", "", SUFFIX, null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
-        formatter, timedRollerPool, null,
+        timedRollerPool, null,
         new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()),
         0, null, null);
 

http://git-wip-us.apache.org/repos/asf/flume/blob/0dacb250/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java
index cfa946a..80f199b 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSCompressedDataStream.java
@@ -20,10 +20,8 @@ package org.apache.flume.sink.hdfs;
 
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharsetDecoder;
-import java.util.Arrays;
 import java.util.List;
 import java.util.zip.GZIPInputStream;
 
@@ -35,7 +33,6 @@ import org.apache.avro.io.DatumReader;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
-import org.apache.flume.sink.FlumeFormatter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
@@ -57,7 +54,6 @@ public class TestHDFSCompressedDataStream {
   private File file;
   private String fileURI;
   private CompressionCodecFactory factory;
-  private FlumeFormatter fmt;
 
   @Before
   public void init() throws Exception {
@@ -72,7 +68,6 @@ public class TestHDFSCompressedDataStream {
     path.getFileSystem(conf); // get FS with our conf cached
 
     this.factory = new CompressionCodecFactory(conf);
-    this.fmt = new HDFSTextFormatter();
   }
 
   // make sure the data makes it to disk if we sync() the data stream
@@ -82,7 +77,7 @@ public class TestHDFSCompressedDataStream {
     HDFSCompressedDataStream writer = new HDFSCompressedDataStream();
     writer.configure(context);
     writer.open(fileURI, factory.getCodec(new Path(fileURI)),
-        SequenceFile.CompressionType.BLOCK, fmt);
+        SequenceFile.CompressionType.BLOCK);
 
     String[] bodies = { "yarf!" };
     writeBodies(writer, bodies);
@@ -91,7 +86,7 @@ public class TestHDFSCompressedDataStream {
     GZIPInputStream cmpIn = new GZIPInputStream(new FileInputStream(file));
     int len = cmpIn.read(buf);
     String result = new String(buf, 0, len, Charsets.UTF_8);
-    result = result.trim(); // HDFSTextFormatter adds a newline
+    result = result.trim(); // BodyTextEventSerializer adds a newline
 
     Assert.assertEquals("input and output must match", bodies[0], result);
   }
@@ -104,9 +99,8 @@ public class TestHDFSCompressedDataStream {
     HDFSCompressedDataStream writer = new HDFSCompressedDataStream();
     writer.configure(context);
 
-    FlumeFormatter fmt = new HDFSTextFormatter();
     writer.open(fileURI, factory.getCodec(new Path(fileURI)),
-        SequenceFile.CompressionType.BLOCK, fmt);
+        SequenceFile.CompressionType.BLOCK);
 
     String[] bodies = { "yarf!", "yarfing!" };
     writeBodies(writer, bodies);
@@ -140,7 +134,7 @@ public class TestHDFSCompressedDataStream {
       throws Exception {
     for (String body : bodies) {
       Event evt = EventBuilder.withBody(body, Charsets.UTF_8);
-      writer.append(evt, fmt);
+      writer.append(evt);
     }
     writer.sync();
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/0dacb250/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSeqFileFormatterFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSeqFileFormatterFactory.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSeqFileFormatterFactory.java
new file mode 100644
index 0000000..9d17785
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSeqFileFormatterFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import org.apache.flume.Context;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestSeqFileFormatterFactory {
+
+  @Test
+  public void getTextFormatter() {
+    SeqFileFormatter formatter =
+        SeqFileFormatterFactory.getFormatter("Text", new Context());
+
+    assertTrue(formatter != null);
+    assertTrue(formatter.getClass().getName(),
+        formatter instanceof HDFSTextFormatter);
+  }
+
+  @Test
+  public void getWritableFormatter() {
+    SeqFileFormatter formatter =
+        SeqFileFormatterFactory.getFormatter("Writable", new Context());
+
+    assertTrue(formatter != null);
+    assertTrue(formatter.getClass().getName(),
+        formatter instanceof HDFSWritableFormatter);
+  }
+
+  @Test
+  public void getCustomFormatter() {
+    SeqFileFormatter formatter = SeqFileFormatterFactory.getFormatter(
+        "org.apache.flume.sink.hdfs.MyCustomFormatter$Builder", new Context());
+
+    assertTrue(formatter != null);
+    assertTrue(formatter.getClass().getName(),
+        formatter instanceof MyCustomFormatter);
+  }
+
+}


Mime
View raw message