flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mpe...@apache.org
Subject git commit: FLUME-1865. Rename the Sequence File formatters to Serializer to be consistent with the rest of Flume.
Date Thu, 24 Jan 2013 03:44:48 GMT
Updated Branches:
  refs/heads/flume-1.4 f00492f21 -> 8fa29e3ee


FLUME-1865. Rename the Sequence File formatters to Serializer to be consistent with the rest
of Flume.

(Hari Shreedharan via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/8fa29e3e
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/8fa29e3e
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/8fa29e3e

Branch: refs/heads/flume-1.4
Commit: 8fa29e3ee0969041b2b3f668d80fbe305729968d
Parents: f00492f
Author: Mike Percy <mpercy@apache.org>
Authored: Wed Jan 23 19:43:14 2013 -0800
Committer: Mike Percy <mpercy@apache.org>
Committed: Wed Jan 23 19:44:31 2013 -0800

----------------------------------------------------------------------
 .../apache/flume/sink/hdfs/HDFSSequenceFile.java   |   22 ++--
 .../apache/flume/sink/hdfs/HDFSTextFormatter.java  |   79 -------------
 .../apache/flume/sink/hdfs/HDFSTextSerializer.java |   79 +++++++++++++
 .../flume/sink/hdfs/HDFSWritableFormatter.java     |   77 ------------
 .../flume/sink/hdfs/HDFSWritableSerializer.java    |   77 ++++++++++++
 .../apache/flume/sink/hdfs/SeqFileFormatter.java   |   68 -----------
 .../flume/sink/hdfs/SeqFileFormatterFactory.java   |   89 --------------
 .../flume/sink/hdfs/SeqFileFormatterType.java      |   37 ------
 .../flume/sink/hdfs/SequenceFileSerializer.java    |   68 +++++++++++
 .../sink/hdfs/SequenceFileSerializerFactory.java   |   90 +++++++++++++++
 .../sink/hdfs/SequenceFileSerializerType.java      |   38 ++++++
 .../apache/flume/sink/hdfs/MyCustomFormatter.java  |   58 ---------
 .../apache/flume/sink/hdfs/MyCustomSerializer.java |   58 +++++++++
 .../apache/flume/sink/hdfs/TestBucketWriter.java   |    6 +-
 .../sink/hdfs/TestHDFSEventSinkOnMiniCluster.java  |    6 +-
 .../sink/hdfs/TestSeqFileFormatterFactory.java     |   58 ---------
 .../hdfs/TestSequenceFileSerializerFactory.java    |   59 ++++++++++
 17 files changed, 487 insertions(+), 482 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
index e127f6a..3bd25f4 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
@@ -39,7 +39,7 @@ public class HDFSSequenceFile implements HDFSWriter {
   private SequenceFile.Writer writer;
   private String writeFormat;
   private Context serializerContext;
-  private SeqFileFormatter formatter;
+  private SequenceFileSerializer serializer;
   private boolean useRawLocalFileSystem;
 
   public HDFSSequenceFile() {
@@ -48,14 +48,15 @@ public class HDFSSequenceFile implements HDFSWriter {
 
   @Override
   public void configure(Context context) {
-    // use binary writable format by default
-    writeFormat = context.getString("hdfs.writeFormat", SeqFileFormatterType.Writable.name());
+    // use binary writable serialize by default
+    writeFormat = context.getString("hdfs.writeFormat",
+      SequenceFileSerializerType.Writable.name());
     useRawLocalFileSystem = context.getBoolean("hdfs.useRawLocalFileSystem",
         false);
     serializerContext = new Context(
-            context.getSubProperties(SeqFileFormatterFactory.CTX_PREFIX));
-    formatter = SeqFileFormatterFactory
-            .getFormatter(writeFormat, serializerContext);
+            context.getSubProperties(SequenceFileSerializerFactory.CTX_PREFIX));
+    serializer = SequenceFileSerializerFactory
+            .getSerializer(writeFormat, serializerContext);
     logger.info("writeFormat = " + writeFormat + ", UseRawLocalFileSystem = "
         + useRawLocalFileSystem);
   }
@@ -82,17 +83,18 @@ public class HDFSSequenceFile implements HDFSWriter {
     if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile
             (dstPath)) {
       FSDataOutputStream outStream = hdfs.append(dstPath);
-      writer = SequenceFile.createWriter(conf, outStream, formatter.getKeyClass(),
-          formatter.getValueClass(), compType, codeC);
+      writer = SequenceFile.createWriter(conf, outStream, serializer
+        .getKeyClass(),
+        serializer.getValueClass(), compType, codeC);
     } else {
       writer = SequenceFile.createWriter(hdfs, conf, dstPath,
-          formatter.getKeyClass(), formatter.getValueClass(), compType, codeC);
+        serializer.getKeyClass(), serializer.getValueClass(), compType, codeC);
     }
   }
 
   @Override
   public void append(Event e) throws IOException {
-    for (SeqFileFormatter.Record record : formatter.format(e)) {
+    for (SequenceFileSerializer.Record record : serializer.serialize(e)) {
       writer.append(record.getKey(), record.getValue());
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java
deleted file mode 100644
index 4b39f5d..0000000
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.sink.hdfs;
-
-import java.util.Collections;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.LongWritable;
-
-public class HDFSTextFormatter implements SeqFileFormatter {
-
-  private Text makeText(Event e) {
-    Text textObject = new Text();
-    textObject.set(e.getBody(), 0, e.getBody().length);
-    return textObject;
-  }
-
-  @Override
-  public Class<LongWritable> getKeyClass() {
-    return LongWritable.class;
-  }
-
-  @Override
-  public Class<Text> getValueClass() {
-    return Text.class;
-  }
-
-  @Override
-  public Iterable<Record> format(Event e) {
-    Object key = getKey(e);
-    Object value = getValue(e);
-    return Collections.singletonList(new Record(key, value));
-  }
-
-  private Object getKey(Event e) {
-    // Write the data to HDFS
-    String timestamp = e.getHeaders().get("timestamp");
-    long eventStamp;
-
-    if (timestamp == null) {
-      eventStamp = System.currentTimeMillis();
-    } else {
-      eventStamp = Long.valueOf(timestamp);
-    }
-    return new LongWritable(eventStamp);
-  }
-
-  private Object getValue(Event e) {
-    return makeText(e);
-  }
-
-  public static class Builder implements SeqFileFormatter.Builder {
-
-    @Override
-    public SeqFileFormatter build(Context context) {
-      return new HDFSTextFormatter();
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextSerializer.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextSerializer.java
new file mode 100644
index 0000000..32fd206
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextSerializer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import java.util.Collections;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.LongWritable;
+
+public class HDFSTextSerializer implements SequenceFileSerializer {
+
+  private Text makeText(Event e) {
+    Text textObject = new Text();
+    textObject.set(e.getBody(), 0, e.getBody().length);
+    return textObject;
+  }
+
+  @Override
+  public Class<LongWritable> getKeyClass() {
+    return LongWritable.class;
+  }
+
+  @Override
+  public Class<Text> getValueClass() {
+    return Text.class;
+  }
+
+  @Override
+  public Iterable<Record> serialize(Event e) {
+    Object key = getKey(e);
+    Object value = getValue(e);
+    return Collections.singletonList(new Record(key, value));
+  }
+
+  private Object getKey(Event e) {
+    // Write the data to HDFS
+    String timestamp = e.getHeaders().get("timestamp");
+    long eventStamp;
+
+    if (timestamp == null) {
+      eventStamp = System.currentTimeMillis();
+    } else {
+      eventStamp = Long.valueOf(timestamp);
+    }
+    return new LongWritable(eventStamp);
+  }
+
+  private Object getValue(Event e) {
+    return makeText(e);
+  }
+
+  public static class Builder implements SequenceFileSerializer.Builder {
+
+    @Override
+    public SequenceFileSerializer build(Context context) {
+      return new HDFSTextSerializer();
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java
deleted file mode 100644
index cece506..0000000
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.sink.hdfs;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-
-import java.util.Collections;
-
-public class HDFSWritableFormatter implements SeqFileFormatter {
-
-  private BytesWritable makeByteWritable(Event e) {
-    BytesWritable bytesObject = new BytesWritable();
-    bytesObject.set(e.getBody(), 0, e.getBody().length);
-    return bytesObject;
-  }
-
-  @Override
-  public Class<LongWritable> getKeyClass() {
-    return LongWritable.class;
-  }
-
-  @Override
-  public Class<BytesWritable> getValueClass() {
-    return BytesWritable.class;
-  }
-
-  @Override
-  public Iterable<Record> format(Event e) {
-    Object key = getKey(e);
-    Object value = getValue(e);
-    return Collections.singletonList(new Record(key, value));
-  }
-
-  private Object getKey(Event e) {
-    String timestamp = e.getHeaders().get("timestamp");
-    long eventStamp;
-
-    if (timestamp == null) {
-      eventStamp = System.currentTimeMillis();
-    } else {
-      eventStamp = Long.valueOf(timestamp);
-    }
-    return new LongWritable(eventStamp);
-  }
-
-  private Object getValue(Event e) {
-    return makeByteWritable(e);
-  }
-
-  public static class Builder implements SeqFileFormatter.Builder {
-
-    @Override
-    public SeqFileFormatter build(Context context) {
-      return new HDFSWritableFormatter();
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableSerializer.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableSerializer.java
new file mode 100644
index 0000000..b25a6ea
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableSerializer.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.util.Collections;
+
+public class HDFSWritableSerializer implements SequenceFileSerializer {
+
+  private BytesWritable makeByteWritable(Event e) {
+    BytesWritable bytesObject = new BytesWritable();
+    bytesObject.set(e.getBody(), 0, e.getBody().length);
+    return bytesObject;
+  }
+
+  @Override
+  public Class<LongWritable> getKeyClass() {
+    return LongWritable.class;
+  }
+
+  @Override
+  public Class<BytesWritable> getValueClass() {
+    return BytesWritable.class;
+  }
+
+  @Override
+  public Iterable<Record> serialize(Event e) {
+    Object key = getKey(e);
+    Object value = getValue(e);
+    return Collections.singletonList(new Record(key, value));
+  }
+
+  private Object getKey(Event e) {
+    String timestamp = e.getHeaders().get("timestamp");
+    long eventStamp;
+
+    if (timestamp == null) {
+      eventStamp = System.currentTimeMillis();
+    } else {
+      eventStamp = Long.valueOf(timestamp);
+    }
+    return new LongWritable(eventStamp);
+  }
+
+  private Object getValue(Event e) {
+    return makeByteWritable(e);
+  }
+
+  public static class Builder implements SequenceFileSerializer.Builder {
+
+    @Override
+    public SequenceFileSerializer build(Context context) {
+      return new HDFSWritableSerializer();
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatter.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatter.java
deleted file mode 100644
index c25931c..0000000
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatter.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flume.sink.hdfs;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-
-public interface SeqFileFormatter {
-
-  Class<?> getKeyClass();
-
-  Class<?> getValueClass();
-
-  /**
-   * Format the given event into zero, one or more SequenceFile records
-   *
-   * @param e
-   *         event
-   * @return a list of records corresponding to the given event
-   */
-  Iterable<Record> format(Event e);
-
-  /**
-   * Knows how to construct this output formatter.<br/>
-   * <b>Note: Implementations MUST provide a public a no-arg constructor.</b>
-   */
-  public interface Builder {
-    public SeqFileFormatter build(Context context);
-  }
-
-  /**
-   * A key-value pair making up a record in an HDFS SequenceFile
-   */
-  public static class Record {
-    private final Object key;
-    private final Object value;
-
-    public Record(Object key, Object value) {
-      this.key = key;
-      this.value = value;
-    }
-
-    public Object getKey() {
-      return key;
-    }
-
-    public Object getValue() {
-      return value;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterFactory.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterFactory.java
deleted file mode 100644
index 20409ba..0000000
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterFactory.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.sink.hdfs;
-
-import com.google.common.base.Preconditions;
-import org.apache.flume.Context;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SeqFileFormatterFactory {
-
-  private static final Logger logger =
-      LoggerFactory.getLogger(SeqFileFormatterFactory.class);
-
-  /**
-   * {@link Context} prefix
-   */
-  static final String CTX_PREFIX = "writeFormat.";
-
-  @SuppressWarnings("unchecked")
-  static SeqFileFormatter getFormatter(String formatType, Context context) {
-
-    Preconditions.checkNotNull(formatType,
-        "format type must not be null");
-
-    // try to find builder class in enum of known formatters
-    SeqFileFormatterType type;
-    try {
-      type = SeqFileFormatterType.valueOf(formatType);
-    } catch (IllegalArgumentException e) {
-      logger.debug("Not in enum, loading builder class: {}", formatType);
-      type = SeqFileFormatterType.Other;
-    }
-    Class<? extends SeqFileFormatter.Builder> builderClass =
-        type.getBuilderClass();
-
-    // handle the case where they have specified their own builder in the config
-    if (builderClass == null) {
-      try {
-        Class c = Class.forName(formatType);
-        if (c != null && SeqFileFormatter.Builder.class.isAssignableFrom(c)) {
-          builderClass = (Class<? extends SeqFileFormatter.Builder>) c;
-        } else {
-          logger.error("Unable to instantiate Builder from {}", formatType);
-          return null;
-        }
-      } catch (ClassNotFoundException ex) {
-        logger.error("Class not found: " + formatType, ex);
-        return null;
-      } catch (ClassCastException ex) {
-        logger.error("Class does not extend " +
-            SeqFileFormatter.Builder.class.getCanonicalName() + ": " +
-            formatType, ex);
-        return null;
-      }
-    }
-
-    // build the builder
-    SeqFileFormatter.Builder builder;
-    try {
-      builder = builderClass.newInstance();
-    } catch (InstantiationException ex) {
-      logger.error("Cannot instantiate builder: " + formatType, ex);
-      return null;
-    } catch (IllegalAccessException ex) {
-      logger.error("Cannot instantiate builder: " + formatType, ex);
-      return null;
-    }
-
-    return builder.build(context);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterType.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterType.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterType.java
deleted file mode 100644
index ff3eb84..0000000
--- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SeqFileFormatterType.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.sink.hdfs;
-
-public enum SeqFileFormatterType {
-  Writable(HDFSWritableFormatter.Builder.class),
-  Text(HDFSTextFormatter.Builder.class),
-  Other(null);
-
-  private final Class<? extends SeqFileFormatter.Builder> builderClass;
-
-  SeqFileFormatterType(Class<? extends SeqFileFormatter.Builder> builderClass) {
-    this.builderClass = builderClass;
-  }
-
-  public Class<? extends SeqFileFormatter.Builder> getBuilderClass() {
-    return builderClass;
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializer.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializer.java
new file mode 100644
index 0000000..ec2b760
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializer.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+
+public interface SequenceFileSerializer {
+
+  Class<?> getKeyClass();
+
+  Class<?> getValueClass();
+
+  /**
+   * Format the given event into zero, one or more SequenceFile records
+   *
+   * @param e
+   *         event
+   * @return a list of records corresponding to the given event
+   */
+  Iterable<Record> serialize(Event e);
+
+  /**
+   * Knows how to construct this output formatter.<br/>
+   * <b>Note: Implementations MUST provide a public a no-arg constructor.</b>
+   */
+  public interface Builder {
+    public SequenceFileSerializer build(Context context);
+  }
+
+  /**
+   * A key-value pair making up a record in an HDFS SequenceFile
+   */
+  public static class Record {
+    private final Object key;
+    private final Object value;
+
+    public Record(Object key, Object value) {
+      this.key = key;
+      this.value = value;
+    }
+
+    public Object getKey() {
+      return key;
+    }
+
+    public Object getValue() {
+      return value;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerFactory.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerFactory.java
new file mode 100644
index 0000000..5678836
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import com.google.common.base.Preconditions;
+import org.apache.flume.Context;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequenceFileSerializerFactory {
+
+  private static final Logger logger =
+      LoggerFactory.getLogger(SequenceFileSerializerFactory.class);
+
+  /**
+   * {@link Context} prefix
+   */
+  static final String CTX_PREFIX = "writeFormat.";
+
+  @SuppressWarnings("unchecked")
+  static SequenceFileSerializer getSerializer(String formatType,
+                                              Context context) {
+
+    Preconditions.checkNotNull(formatType,
+        "serialize type must not be null");
+
+    // try to find builder class in enum of known formatters
+    SequenceFileSerializerType type;
+    try {
+      type = SequenceFileSerializerType.valueOf(formatType);
+    } catch (IllegalArgumentException e) {
+      logger.debug("Not in enum, loading builder class: {}", formatType);
+      type = SequenceFileSerializerType.Other;
+    }
+    Class<? extends SequenceFileSerializer.Builder> builderClass =
+        type.getBuilderClass();
+
+    // handle the case where they have specified their own builder in the config
+    if (builderClass == null) {
+      try {
+        Class c = Class.forName(formatType);
+        if (c != null && SequenceFileSerializer.Builder.class.isAssignableFrom(c))
{
+          builderClass = (Class<? extends SequenceFileSerializer.Builder>) c;
+        } else {
+          logger.error("Unable to instantiate Builder from {}", formatType);
+          return null;
+        }
+      } catch (ClassNotFoundException ex) {
+        logger.error("Class not found: " + formatType, ex);
+        return null;
+      } catch (ClassCastException ex) {
+        logger.error("Class does not extend " +
+            SequenceFileSerializer.Builder.class.getCanonicalName() + ": " +
+            formatType, ex);
+        return null;
+      }
+    }
+
+    // build the builder
+    SequenceFileSerializer.Builder builder;
+    try {
+      builder = builderClass.newInstance();
+    } catch (InstantiationException ex) {
+      logger.error("Cannot instantiate builder: " + formatType, ex);
+      return null;
+    } catch (IllegalAccessException ex) {
+      logger.error("Cannot instantiate builder: " + formatType, ex);
+      return null;
+    }
+
+    return builder.build(context);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerType.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerType.java
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerType.java
new file mode 100644
index 0000000..4351488
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerType.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+public enum SequenceFileSerializerType {
+  Writable(HDFSWritableSerializer.Builder.class),
+  Text(HDFSTextSerializer.Builder.class),
+  Other(null);
+
+  private final Class<? extends SequenceFileSerializer.Builder> builderClass;
+
+  SequenceFileSerializerType(
+    Class<? extends SequenceFileSerializer.Builder> builderClass) {
+    this.builderClass = builderClass;
+  }
+
+  public Class<? extends SequenceFileSerializer.Builder> getBuilderClass() {
+    return builderClass;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomFormatter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomFormatter.java
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomFormatter.java
deleted file mode 100644
index ab1e463..0000000
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomFormatter.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flume.sink.hdfs;
-
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-
-import java.util.Arrays;
-
-public class MyCustomFormatter implements SeqFileFormatter {
-
-  @Override
-  public Class<LongWritable> getKeyClass() {
-    return LongWritable.class;
-  }
-
-  @Override
-  public Class<BytesWritable> getValueClass() {
-    return BytesWritable.class;
-  }
-
-  @Override
-  public Iterable<Record> format(Event e) {
-    return Arrays.asList(
-        new Record(new LongWritable(1234L), new BytesWritable(new byte[10])),
-        new Record(new LongWritable(4567L), new BytesWritable(new byte[20]))
-    );
-  }
-
-  public static class Builder implements SeqFileFormatter.Builder {
-
-    @Override
-    public SeqFileFormatter build(Context context) {
-      return new MyCustomFormatter();
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomSerializer.java
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomSerializer.java
new file mode 100644
index 0000000..72164fd
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomSerializer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.hdfs;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.util.Arrays;
+
+public class MyCustomSerializer implements SequenceFileSerializer {
+
+  @Override
+  public Class<LongWritable> getKeyClass() {
+    return LongWritable.class;
+  }
+
+  @Override
+  public Class<BytesWritable> getValueClass() {
+    return BytesWritable.class;
+  }
+
+  @Override
+  public Iterable<Record> serialize(Event e) {
+    return Arrays.asList(
+        new Record(new LongWritable(1234L), new BytesWritable(new byte[10])),
+        new Record(new LongWritable(4567L), new BytesWritable(new byte[20]))
+    );
+  }
+
+  public static class Builder implements SequenceFileSerializer.Builder {
+
+    @Override
+    public SequenceFileSerializer build(Context context) {
+      return new MyCustomSerializer();
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
index 829d7e8..ebe277c 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
@@ -195,7 +195,7 @@ public class TestBucketWriter {
         open = true;
       }
     };
-    HDFSTextFormatter formatter = new HDFSTextFormatter();
+    HDFSTextSerializer serializer = new HDFSTextSerializer();
     File tmpFile = File.createTempFile("flume", "test");
     tmpFile.deleteOnExit();
     String path = tmpFile.getParent();
@@ -280,7 +280,7 @@ public class TestBucketWriter {
     final String PREFIX = "BRNO_IS_CITY_IN_CZECH_REPUBLIC";
 
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    HDFSTextFormatter formatter = new HDFSTextFormatter();
+    HDFSTextSerializer formatter = new HDFSTextSerializer();
     BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
         "/tmp", "file", PREFIX, ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
         timedRollerPool, null,
@@ -299,7 +299,7 @@ public class TestBucketWriter {
     final String SUFFIX = "WELCOME_TO_THE_HELLMOUNTH";
 
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    HDFSTextFormatter formatter = new HDFSTextFormatter();
+    HDFSTextSerializer serializer = new HDFSTextSerializer();
     BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx,
         "/tmp", "file", "", SUFFIX, null, null, SequenceFile.CompressionType.NONE, hdfsWriter,
         timedRollerPool, null,

http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
index bcd19e9..2e71069 100644
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -153,7 +153,7 @@ public class TestHDFSEventSinkOnMiniCluster {
   }
 
   /**
-   * Writes two events in GZIP-compressed format.
+   * Writes two events in GZIP-compressed serialize.
    */
   @Test
   public void simpleHDFSGZipCompressedTest() throws EventDeliveryException, IOException {

http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSeqFileFormatterFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSeqFileFormatterFactory.java
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSeqFileFormatterFactory.java
deleted file mode 100644
index 9d17785..0000000
--- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSeqFileFormatterFactory.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.sink.hdfs;
-
-import org.apache.flume.Context;
-import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
-
-public class TestSeqFileFormatterFactory {
-
-  @Test
-  public void getTextFormatter() {
-    SeqFileFormatter formatter =
-        SeqFileFormatterFactory.getFormatter("Text", new Context());
-
-    assertTrue(formatter != null);
-    assertTrue(formatter.getClass().getName(),
-        formatter instanceof HDFSTextFormatter);
-  }
-
-  @Test
-  public void getWritableFormatter() {
-    SeqFileFormatter formatter =
-        SeqFileFormatterFactory.getFormatter("Writable", new Context());
-
-    assertTrue(formatter != null);
-    assertTrue(formatter.getClass().getName(),
-        formatter instanceof HDFSWritableFormatter);
-  }
-
-  @Test
-  public void getCustomFormatter() {
-    SeqFileFormatter formatter = SeqFileFormatterFactory.getFormatter(
-        "org.apache.flume.sink.hdfs.MyCustomFormatter$Builder", new Context());
-
-    assertTrue(formatter != null);
-    assertTrue(formatter.getClass().getName(),
-        formatter instanceof MyCustomFormatter);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/8fa29e3e/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java
new file mode 100644
index 0000000..6381edc
--- /dev/null
+++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hdfs;
+
+import org.apache.flume.Context;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestSequenceFileSerializerFactory {
+
+  @Test
+  public void getTextFormatter() {
+    SequenceFileSerializer formatter =
+        SequenceFileSerializerFactory.getSerializer("Text", new Context());
+
+    assertTrue(formatter != null);
+    assertTrue(formatter.getClass().getName(),
+        formatter instanceof HDFSTextSerializer);
+  }
+
+  @Test
+  public void getWritableFormatter() {
+    SequenceFileSerializer formatter =
+        SequenceFileSerializerFactory.getSerializer("Writable", new Context());
+
+    assertTrue(formatter != null);
+    assertTrue(formatter.getClass().getName(),
+        formatter instanceof HDFSWritableSerializer);
+  }
+
+  @Test
+  public void getCustomFormatter() {
+    SequenceFileSerializer formatter = SequenceFileSerializerFactory
+      .getSerializer(
+        "org.apache.flume.sink.hdfs.MyCustomSerializer$Builder", new Context());
+
+    assertTrue(formatter != null);
+    assertTrue(formatter.getClass().getName(),
+        formatter instanceof MyCustomSerializer);
+  }
+
+}


Mime
View raw message