metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmerri...@apache.org
Subject [metron] branch master updated: METRON-1968 Messages are lost when a parser produces multiple messages and batch size is greater than 1 (merrimanr) closes apache/metron#1330
Date Tue, 26 Feb 2019 19:37:39 GMT
This is an automated email from the ASF dual-hosted git repository.

rmerriman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d2cec7  METRON-1968 Messages are lost when a parser produces multiple messages and batch size is greater than 1 (merrimanr) closes apache/metron#1330
2d2cec7 is described below

commit 2d2cec749c52659f4cf5b9177047b018222a13cd
Author: merrimanr <merrimanr@gmail.com>
AuthorDate: Tue Feb 26 13:37:24 2019 -0600

    METRON-1968 Messages are lost when a parser produces multiple messages and batch size is greater than 1 (merrimanr) closes apache/metron#1330
---
 .../apache/metron/common/utils/MessageUtils.java   |   4 +
 .../{MessageWriter.java => BulkMessage.java}       |  43 ++-
 .../metron/common/writer/BulkMessageWriter.java    |  13 +-
 .../metron/common/writer/BulkWriterResponse.java   |  51 +--
 .../writer/{MessageWriter.java => MessageId.java}  |  42 ++-
 .../apache/metron/common/writer/MessageWriter.java |   4 +-
 .../bulk/ElasticsearchBulkDocumentWriter.java      |  14 +-
 .../elasticsearch/writer/ElasticsearchWriter.java  |  51 +--
 ...edDocument.java => MessageIdBasedDocument.java} |  24 +-
 .../writer/ElasticsearchWriterTest.java            | 174 ++++-----
 .../writer/SimpleHbaseEnrichmentWriter.java        |  18 +-
 .../apache/metron/hbase/writer/HBaseWriter.java    |  88 -----
 .../data/jsonMapQuery/parsed/jsonMapExampleParsed  |  10 +-
 .../data/jsonMapQuery/raw/jsonMapExampleOutput     |   2 +-
 .../config/zookeeper/parsers/jsonMapQuery.json     |   2 +-
 .../writers/SimpleHBaseEnrichmentWriterTest.java   |  16 +-
 .../org/apache/metron/parsers/bolt/ParserBolt.java |  87 ++---
 .../org/apache/metron/parsers/bolt/WriterBolt.java |  22 +-
 .../apache/metron/parsers/bolt/WriterHandler.java  |  52 +--
 .../apache/metron/parsers/bolt/ParserBoltTest.java | 151 +++++---
 .../apache/metron/parsers/bolt/WriterBoltTest.java | 106 ++++--
 .../integration/validation/StormParserDriver.java  |  35 +-
 .../org/apache/metron/pcap/writer/PcapWriter.java  |  59 ---
 .../org/apache/metron/solr/writer/SolrWriter.java  |  22 +-
 .../schema/SchemaValidationIntegrationTest.java    |  23 +-
 .../apache/metron/solr/writer/SolrWriterTest.java  |   9 +-
 .../org/apache/metron/writer/AckTuplesPolicy.java  | 159 +++++++++
 .../org/apache/metron/writer/BatchSizePolicy.java  |  58 +++
 .../apache/metron/writer/BatchTimeoutPolicy.java   | 108 ++++++
 .../apache/metron/writer/BulkWriterComponent.java  | 397 +++++++--------------
 .../java/org/apache/metron/writer/FlushPolicy.java |  51 +++
 .../java/org/apache/metron/writer/NoopWriter.java  |  11 +-
 .../apache/metron/writer/WriterToBulkWriter.java   |  16 +-
 .../metron/writer/bolt/BatchTimeoutHelper.java     |   6 +-
 .../metron/writer/bolt/BulkMessageWriterBolt.java  | 110 ++++--
 .../org/apache/metron/writer/hdfs/HdfsWriter.java  |  42 +--
 .../apache/metron/writer/kafka/KafkaWriter.java    |  32 +-
 .../apache/metron/writer/AckTuplesPolicyTest.java  | 218 +++++++++++
 .../apache/metron/writer/BatchSizePolicyTest.java  |  72 ++++
 .../metron/writer/BatchTimeoutPolicyTest.java      | 105 ++++++
 .../metron/writer/BulkWriterComponentTest.java     | 355 +++++++-----------
 .../metron/writer/bolt/BatchTimeoutHelperTest.java |  22 +-
 .../writer}/bolt/BulkMessageWriterBoltTest.java    | 326 +++++++++--------
 .../apache/metron/writer/hdfs/HdfsWriterTest.java  |  55 ++-
 .../metron/writer/kafka/KafkaWriterTest.java       | 134 ++++++-
 45 files changed, 2017 insertions(+), 1382 deletions(-)

diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/MessageUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/MessageUtils.java
index df711fa..d404c95 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/MessageUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/MessageUtils.java
@@ -22,6 +22,10 @@ import org.json.simple.JSONObject;
 
 public class MessageUtils {
 
+  public static String getGuid(JSONObject message) {
+    return (String) message.get(Constants.GUID);
+  }
+
   public static String getSensorType(JSONObject message) {
     return (String) message.get(Constants.SENSOR_TYPE);
   }
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/MessageWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessage.java
similarity index 51%
copy from metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/MessageWriter.java
copy to metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessage.java
index b0e48a9..a00f9b8 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/MessageWriter.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessage.java
@@ -17,15 +17,42 @@
  */
 package org.apache.metron.common.writer;
 
-import org.apache.storm.tuple.Tuple;
-import org.apache.metron.common.configuration.Configurations;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import java.util.Objects;
 
-import java.io.Serializable;
+public class BulkMessage<MESSAGE_T> {
 
-public interface MessageWriter<T> extends AutoCloseable, Serializable {
+  private MessageId id;
+  private MESSAGE_T message;
 
-  void init();
-  void write(String sensorType, WriterConfiguration configurations, Tuple tuple, T message) throws Exception;
-  String getName();
+  public BulkMessage(MessageId id, MESSAGE_T message) {
+    this.id = id;
+    this.message = message;
+  }
+
+  public BulkMessage(String id, MESSAGE_T message) {
+    this(new MessageId(id), message);
+  }
+
+  public MessageId getId() {
+    return id;
+  }
+
+  public MESSAGE_T getMessage() {
+    return message;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    BulkMessage<?> that = (BulkMessage<?>) o;
+    return Objects.equals(id, that.id) &&
+            Objects.equals(message, that.message);
+  }
+
+  @Override
+  public int hashCode() {
+
+    return Objects.hash(id, message);
+  }
 }
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java
index 64ba0cc..7f92d37 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkMessageWriter.java
@@ -18,9 +18,6 @@
 package org.apache.metron.common.writer;
 
 import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
-import org.apache.metron.common.configuration.Configurations;
-import org.apache.metron.common.configuration.EnrichmentConfigurations;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 
 import java.io.Serializable;
@@ -32,18 +29,16 @@ public interface BulkMessageWriter<MESSAGE_T> extends AutoCloseable, Serializabl
   void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration config) throws Exception;
 
   /**
-  * Writes the messages to a particular output (e.g. Elasticsearch). Exceptions trigger failure of the entire batch.
-  * @param sensorType The type of sensor being generating the messages
+  * Writes the messages to a particular output (e.g. Elasticsearch).  A response is returned with successful and failed message ids.
+  * @param sensorType The type of sensor generating the messages
   * @param configurations Configurations that should be passed to the writer (e.g. index and
-  * @param tuples The Tuples that produced the message to be written
-  * @param messages  The message to be written
+  * @param messages  A list of messages to be written.  Message ids are used in the response to report successes/failures.
   * @return A response containing successes and failures within the batch.
   * @throws Exception If an unrecoverable error is made, an Exception is thrown which should be treated as a full-batch failure (e.g. target system is down).
   */
   BulkWriterResponse write(String sensorType
             , WriterConfiguration configurations
-            , Iterable<Tuple> tuples
-            , List<MESSAGE_T> messages
+            , List<BulkMessage<MESSAGE_T>> messages
             ) throws Exception;
 
   String getName();
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterResponse.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterResponse.java
index 7606153..20fa70c 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterResponse.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/BulkWriterResponse.java
@@ -18,33 +18,38 @@
 
 package org.apache.metron.common.writer;
 
-import org.apache.storm.tuple.Tuple;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * This class contains the results of a {@link org.apache.metron.common.writer.BulkMessageWriter#write(String, WriterConfiguration, List)}
+ * call.  Each message in a batch either succeeds or fails and is represented in the response as a
+ * {@link org.apache.metron.common.writer.MessageId}.
+ */
 public class BulkWriterResponse {
-    private Multimap<Throwable, Tuple> errors = ArrayListMultimap.create();
-    private List<Tuple> successes = new ArrayList<>();
+    private Multimap<Throwable, MessageId> errors = ArrayListMultimap.create();
+    private List<MessageId> successes = new ArrayList<>();
 
-    public void addError(Throwable error, Tuple tuple) {
-        errors.put(error, tuple);
+    public void addError(Throwable error, MessageId id) {
+        errors.put(error, id);
     }
 
-    /**
-     * Adds provided errors and associated tuples.
-     *
-     * @param error The error to add
-     * @param tuples Iterable of all tuples with the error
-     */
-    public void addAllErrors(Throwable error, Iterable<Tuple> tuples) {
-        if(tuples != null) {
-            errors.putAll(error, tuples);
+  /**
+   * Adds provided errors and associated tuples.
+   *
+   * @param error The error to add
+   * @param ids Iterable of all messages with the error
+   */
+    public void addAllErrors(Throwable error, Iterable<MessageId> ids) {
+        if(ids != null) {
+            errors.putAll(error, ids);
         }
     }
 
@@ -52,26 +57,26 @@ public class BulkWriterResponse {
         return !errors.isEmpty();
     }
 
-    public void addSuccess(Tuple success) {
+    public void addSuccess(MessageId success) {
         successes.add(success);
     }
 
-    /**
-     * Adds all provided successes.
-     *
-     * @param allSuccesses Successes to add
-     */
-    public void addAllSuccesses(Iterable<Tuple> allSuccesses) {
+  /**
+   * Adds all provided successes.
+   *
+   * @param allSuccesses Successes to add
+   */
+    public void addAllSuccesses(Iterable<MessageId> allSuccesses) {
         if(allSuccesses != null) {
             Iterables.addAll(successes, allSuccesses);
         }
     }
 
-    public Map<Throwable, Collection<Tuple>> getErrors() {
+    public Map<Throwable, Collection<MessageId>> getErrors() {
         return errors.asMap();
     }
 
-    public List<Tuple> getSuccesses() {
+    public List<MessageId> getSuccesses() {
         return successes;
     }
 
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/MessageWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/MessageId.java
similarity index 58%
copy from metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/MessageWriter.java
copy to metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/MessageId.java
index b0e48a9..5227a04 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/MessageWriter.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/MessageId.java
@@ -15,17 +15,43 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.metron.common.writer;
 
-import org.apache.storm.tuple.Tuple;
-import org.apache.metron.common.configuration.Configurations;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import java.util.Objects;
+
+public class MessageId {
+
+  private String id;
+
+  public MessageId(String id) {
+    this.id = id;
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    MessageId messageId = (MessageId) o;
+    return Objects.equals(id, messageId.id);
+  }
+
+  @Override
+  public int hashCode() {
+
+    return Objects.hash(id);
+  }
 
-import java.io.Serializable;
+  @Override
+  public String toString() {
+    return "MessageId{" +
+            "id='" + id + '\'' +
+            '}';
+  }
 
-public interface MessageWriter<T> extends AutoCloseable, Serializable {
 
-  void init();
-  void write(String sensorType, WriterConfiguration configurations, Tuple tuple, T message) throws Exception;
-  String getName();
 }
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/MessageWriter.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/MessageWriter.java
index b0e48a9..65f6484 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/MessageWriter.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/writer/MessageWriter.java
@@ -17,8 +17,6 @@
  */
 package org.apache.metron.common.writer;
 
-import org.apache.storm.tuple.Tuple;
-import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 
 import java.io.Serializable;
@@ -26,6 +24,6 @@ import java.io.Serializable;
 public interface MessageWriter<T> extends AutoCloseable, Serializable {
 
   void init();
-  void write(String sensorType, WriterConfiguration configurations, Tuple tuple, T message) throws Exception;
+  void write(String sensorType, WriterConfiguration configurations, BulkMessage<T> message) throws Exception;
   String getName();
 }
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java
index 7aea2fc..bde5664 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java
@@ -32,7 +32,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Writes documents to an Elasticsearch index in bulk.
@@ -86,7 +88,14 @@ public class ElasticsearchBulkDocumentWriter<D extends Document> implements Bulk
             // submit the request and handle the response
             BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequest);
             handleBulkResponse(bulkResponse, documents, results);
-
+            if (LOG.isDebugEnabled()) {
+                String shards = Arrays.stream(bulkResponse.getItems())
+                        .map(bulkItemResponse -> bulkItemResponse.getResponse().getShardId().toString())
+                        .collect(Collectors.joining(","));
+                LOG.debug("{} results written to shards {} in {} ms; batchSize={}, success={}, failed={}",
+                        bulkResponse.getItems().length, shards, bulkResponse.getTookInMillis(),
+                        documents.size(), results.getSuccesses().size(), results.getFailures().size());
+            }
         } catch(IOException e) {
             // assume all documents have failed
             for(Indexable indexable: documents) {
@@ -99,9 +108,6 @@ public class ElasticsearchBulkDocumentWriter<D extends Document> implements Bulk
             // flush all documents no matter which ones succeeded or failed
             documents.clear();
         }
-
-        LOG.debug("Wrote document(s) to Elasticsearch; batchSize={}, success={}, failed={}",
-                documents.size(), results.getSuccesses().size(), results.getFailures().size());
         return results;
     }
 
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index a3459d8..7d14c11 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -17,12 +17,12 @@
  */
 package org.apache.metron.elasticsearch.writer;
 
-import com.google.common.collect.Lists;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.field.FieldNameConverter;
 import org.apache.metron.common.field.FieldNameConverters;
 import org.apache.metron.common.writer.BulkMessageWriter;
+import org.apache.metron.common.writer.BulkMessage;
 import org.apache.metron.common.writer.BulkWriterResponse;
 import org.apache.metron.elasticsearch.bulk.BulkDocumentWriter;
 import org.apache.metron.elasticsearch.bulk.ElasticsearchBulkDocumentWriter;
@@ -34,7 +34,6 @@ import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +41,6 @@ import org.slf4j.LoggerFactory;
 import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
@@ -65,11 +63,11 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
   /**
    * Responsible for writing documents.
    *
-   * <p>Uses a {@link TupleBasedDocument} to maintain the relationship between
-   * a {@link Tuple} and the document created from the contents of that tuple. If
-   * a document cannot be written, the associated tuple needs to be failed.
+   * <p>Uses a {@link MessageIdBasedDocument} to maintain the relationship between
+   * a {@link org.apache.metron.common.writer.MessageId} and the document created from the contents of that message. If
+   * a document cannot be written, the associated message needs to be reported as a failure.
    */
-  private transient BulkDocumentWriter<TupleBasedDocument> documentWriter;
+  private transient BulkDocumentWriter<MessageIdBasedDocument> documentWriter;
 
   /**
    * A simple data formatter used to build the appropriate Elasticsearch index name.
@@ -91,50 +89,39 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
   @Override
   public BulkWriterResponse write(String sensorType,
                                   WriterConfiguration configurations,
-                                  Iterable<Tuple> tuplesIter,
-                                  List<JSONObject> messages) {
+                                  List<BulkMessage<JSONObject>> messages) {
 
     // fetch the field name converter for this sensor type
     FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations);
     String indexPostfix = dateFormat.format(new Date());
     String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
 
-    // the number of tuples must match the number of messages
-    List<Tuple> tuples = Lists.newArrayList(tuplesIter);
-    int batchSize = tuples.size();
-    if(messages.size() != batchSize) {
-      throw new IllegalStateException(format("Expect same number of tuples and messages; |tuples|=%d, |messages|=%d",
-              tuples.size(), messages.size()));
-    }
-
     // create a document from each message
-    for(int i=0; i<tuples.size(); i++) {
-      JSONObject message = messages.get(i);
-      Tuple tuple = tuples.get(i);
-      TupleBasedDocument document = createDocument(message, tuple, sensorType, fieldNameConverter);
+    for(BulkMessage<JSONObject> bulkWriterMessage: messages) {
+      MessageIdBasedDocument document = createDocument(bulkWriterMessage, sensorType, fieldNameConverter);
       documentWriter.addDocument(document, indexName);
     }
 
     // write the documents
-    BulkDocumentWriterResults<TupleBasedDocument> results = documentWriter.write();
+    BulkDocumentWriterResults<MessageIdBasedDocument> results = documentWriter.write();
 
     // build the response
     BulkWriterResponse response = new BulkWriterResponse();
-    for(WriteSuccess<TupleBasedDocument> success: results.getSuccesses()) {
-      response.addSuccess(success.getDocument().getTuple());
+    for(WriteSuccess<MessageIdBasedDocument> success: results.getSuccesses()) {
+      response.addSuccess(success.getDocument().getMessageId());
     }
-    for(WriteFailure<TupleBasedDocument> failure: results.getFailures()) {
-      response.addError(failure.getCause(), failure.getDocument().getTuple());
+    for(WriteFailure<MessageIdBasedDocument> failure: results.getFailures()) {
+      response.addError(failure.getCause(), failure.getDocument().getMessageId());
     }
     return response;
   }
 
-  private TupleBasedDocument createDocument(JSONObject message,
-                                            Tuple tuple,
-                                            String sensorType,
-                                            FieldNameConverter fieldNameConverter) {
+  private MessageIdBasedDocument createDocument(BulkMessage<JSONObject> bulkWriterMessage,
+                                                String sensorType,
+                                                FieldNameConverter fieldNameConverter) {
     // transform the message fields to the source fields of the indexed document
     JSONObject source = new JSONObject();
+    JSONObject message = bulkWriterMessage.getMessage();
     for(Object k : message.keySet()){
       copyField(k.toString(), message, source, fieldNameConverter);
     }
@@ -154,7 +141,7 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
       LOG.warn("Missing '{}' field; timestamp will be set to system time.", TIMESTAMP.getName());
     }
 
-    return new TupleBasedDocument(source, guid, sensorType, timestamp, tuple);
+    return new MessageIdBasedDocument(source, guid, sensorType, timestamp, bulkWriterMessage.getId());
   }
 
   @Override
@@ -199,7 +186,7 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
    * Set the document writer.  Primarily used for testing.
    * @param documentWriter The {@link BulkDocumentWriter} to use.
    */
-  public void setDocumentWriter(BulkDocumentWriter<TupleBasedDocument> documentWriter) {
+  public void setDocumentWriter(BulkDocumentWriter<MessageIdBasedDocument> documentWriter) {
     this.documentWriter = documentWriter;
   }
 }
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/TupleBasedDocument.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/MessageIdBasedDocument.java
similarity index 64%
rename from metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/TupleBasedDocument.java
rename to metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/MessageIdBasedDocument.java
index ba44937..62f4c75 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/TupleBasedDocument.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/MessageIdBasedDocument.java
@@ -17,28 +17,28 @@
  */
 package org.apache.metron.elasticsearch.writer;
 
+import org.apache.metron.common.writer.MessageId;
 import org.apache.metron.indexing.dao.update.Document;
-import org.apache.storm.tuple.Tuple;
 
 import java.util.Map;
 
 /**
- * An {@link Document} that is created from the contents of a {@link Tuple}.
+ * A {@link Document} that is created from message id.
  */
-public class TupleBasedDocument extends Document {
+public class MessageIdBasedDocument extends Document {
 
-    private Tuple tuple;
+    private MessageId messageId;
 
-    public TupleBasedDocument(Map<String, Object> document,
-                              String guid,
-                              String sensorType,
-                              Long timestamp,
-                              Tuple tuple) {
+    public MessageIdBasedDocument(Map<String, Object> document,
+                                  String guid,
+                                  String sensorType,
+                                  Long timestamp,
+                                  MessageId messageId) {
         super(document, guid, sensorType, timestamp);
-        this.tuple = tuple;
+        this.messageId = messageId;
     }
 
-    public Tuple getTuple() {
-        return tuple;
+    public MessageId getMessageId() {
+        return messageId;
     }
 }
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
index e5e85b0..3d6a3fa 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
@@ -20,11 +20,12 @@ package org.apache.metron.elasticsearch.writer;
 
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.writer.BulkMessage;
 import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.common.writer.MessageId;
 import org.apache.metron.elasticsearch.bulk.BulkDocumentWriter;
 import org.apache.metron.elasticsearch.bulk.BulkDocumentWriterResults;
 import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
 import org.junit.Before;
 import org.junit.Test;
@@ -40,7 +41,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -62,216 +62,198 @@ public class ElasticsearchWriterTest {
 
     @Test
     public void shouldWriteSuccessfully() {
-        // create a tuple and a message associated with that tuple
-        List<Tuple> tuples = createTuples(1);
-        List<JSONObject> messages = createMessages(1);
+        // create a message id and a message associated with that id
+        List<BulkMessage<JSONObject>> messages = createMessages(1);
 
         // create a document writer which will successfully write all
-        BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>();
-        results.addSuccess(createDocument(messages.get(0), tuples.get(0)));
-        BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+        BulkDocumentWriterResults<MessageIdBasedDocument> results = new BulkDocumentWriterResults<>();
+        results.addSuccess(createDocument(messages.get(0)));
+        BulkDocumentWriter<MessageIdBasedDocument> docWriter = mock(BulkDocumentWriter.class);
         when(docWriter.write()).thenReturn(results);
 
         // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
         esWriter.setDocumentWriter(docWriter);
         esWriter.init(stormConf, topologyContext, writerConfiguration);
-        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
+        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, messages);
 
         // response should only contain successes
         assertFalse(response.hasErrors());
-        assertTrue(response.getSuccesses().contains(tuples.get(0)));
+        assertTrue(response.getSuccesses().contains(new MessageId("message1")));
     }
 
     @Test
     public void shouldWriteManySuccessfully() {
-        // create a few tuples and the messages associated with the tuples
-        List<Tuple> tuples = createTuples(3);
-        List<JSONObject> messages = createMessages(3);
+        // create a few message ids and the messages associated with the ids
+        List<BulkMessage<JSONObject>> messages = createMessages(3);
 
         // create a document writer which will successfully write all
-        BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>();
-        results.addSuccess(createDocument(messages.get(0), tuples.get(0)));
-        results.addSuccess(createDocument(messages.get(1), tuples.get(1)));
-        results.addSuccess(createDocument(messages.get(2), tuples.get(2)));
-        BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+        BulkDocumentWriterResults<MessageIdBasedDocument> results = new BulkDocumentWriterResults<>();
+        results.addSuccess(createDocument(messages.get(0)));
+        results.addSuccess(createDocument(messages.get(1)));
+        results.addSuccess(createDocument(messages.get(2)));
+        BulkDocumentWriter<MessageIdBasedDocument> docWriter = mock(BulkDocumentWriter.class);
         when(docWriter.write()).thenReturn(results);
 
         // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
         esWriter.setDocumentWriter(docWriter);
         esWriter.init(stormConf, topologyContext, writerConfiguration);
-        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
+        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, messages);
 
         // response should only contain successes
         assertFalse(response.hasErrors());
-        assertTrue(response.getSuccesses().contains(tuples.get(0)));
-        assertTrue(response.getSuccesses().contains(tuples.get(1)));
-        assertTrue(response.getSuccesses().contains(tuples.get(2)));
+        assertTrue(response.getSuccesses().contains(new MessageId("message1")));
+        assertTrue(response.getSuccesses().contains(new MessageId("message2")));
+        assertTrue(response.getSuccesses().contains(new MessageId("message3")));
     }
 
     @Test
     public void shouldHandleWriteFailure() {
-        // create a tuple and a message associated with that tuple
-        List<Tuple> tuples = createTuples(1);
-        List<JSONObject> messages = createMessages(1);
+        // create a message id and a message associated with that id
+        List<BulkMessage<JSONObject>> messages = createMessages(3);
         Exception cause = new Exception();
 
         // create a document writer which will fail all writes
-        BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>();
-        results.addFailure(createDocument(messages.get(0), tuples.get(0)), cause, "error");
-        BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+        BulkDocumentWriterResults<MessageIdBasedDocument> results = new BulkDocumentWriterResults<>();
+        results.addFailure(createDocument(messages.get(0)), cause, "error");
+        BulkDocumentWriter<MessageIdBasedDocument> docWriter = mock(BulkDocumentWriter.class);
         when(docWriter.write()).thenReturn(results);
 
         // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
         esWriter.setDocumentWriter(docWriter);
         esWriter.init(stormConf, topologyContext, writerConfiguration);
-        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
+        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, messages);
 
         // the writer response should only contain failures
         assertEquals(0, response.getSuccesses().size());
         assertEquals(1, response.getErrors().size());
-        Collection<Tuple> errors = response.getErrors().get(cause);
-        assertTrue(errors.contains(tuples.get(0)));
+        Collection<MessageId> errors = response.getErrors().get(cause);
+        assertTrue(errors.contains(new MessageId("message1")));
     }
 
     @Test
     public void shouldHandleManyWriteFailures() {
-        // create a few tuples and the messages associated with the tuples
+        // create a few message ids and the messages associated with the ids
         int count = 3;
-        List<Tuple> tuples = createTuples(count);
-        List<JSONObject> messages = createMessages(count);
+        List<BulkMessage<JSONObject>> messages = createMessages(count);
         Exception cause = new Exception();
 
         // create a document writer which will fail all writes
-        BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>();
-        results.addFailure(createDocument(messages.get(0), tuples.get(0)), cause, "error");
-        results.addFailure(createDocument(messages.get(1), tuples.get(1)), cause, "error");
-        results.addFailure(createDocument(messages.get(2), tuples.get(2)), cause, "error");
-        BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+        BulkDocumentWriterResults<MessageIdBasedDocument> results = new BulkDocumentWriterResults<>();
+        results.addFailure(createDocument(messages.get(0)), cause, "error");
+        results.addFailure(createDocument(messages.get(1)), cause, "error");
+        results.addFailure(createDocument(messages.get(2)), cause, "error");
+        BulkDocumentWriter<MessageIdBasedDocument> docWriter = mock(BulkDocumentWriter.class);
         when(docWriter.write()).thenReturn(results);
 
         // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
         esWriter.setDocumentWriter(docWriter);
         esWriter.init(stormConf, topologyContext, writerConfiguration);
-        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
+        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, messages);
 
         // the writer response should only contain failures
         assertEquals(0, response.getSuccesses().size());
         assertEquals(1, response.getErrors().size());
-        Collection<Tuple> errors = response.getErrors().get(cause);
-        assertTrue(errors.contains(tuples.get(0)));
-        assertTrue(errors.contains(tuples.get(1)));
-        assertTrue(errors.contains(tuples.get(2)));
+        Collection<MessageId> errors = response.getErrors().get(cause);
+        assertTrue(errors.contains(new MessageId("message1")));
+        assertTrue(errors.contains(new MessageId("message2")));
+        assertTrue(errors.contains(new MessageId("message3")));
     }
 
     @Test
     public void shouldHandlePartialFailures() {
-        // create a few tuples and the messages associated with the tuples
+        // create a few message ids and the messages associated with the ids
         int count = 2;
-        List<Tuple> tuples = createTuples(count);
-        List<JSONObject> messages = createMessages(count);
+        List<BulkMessage<JSONObject>> messages = createMessages(count);
         Exception cause = new Exception();
 
         // create a document writer that will fail one and succeed the other
-        BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>();
-        results.addFailure(createDocument(messages.get(0), tuples.get(0)), cause, "error");
-        results.addSuccess(createDocument(messages.get(1), tuples.get(1)));
-        BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+        BulkDocumentWriterResults<MessageIdBasedDocument> results = new BulkDocumentWriterResults<>();
+        results.addFailure(createDocument(messages.get(0)), cause, "error");
+        results.addSuccess(createDocument(messages.get(1)));
+        BulkDocumentWriter<MessageIdBasedDocument> docWriter = mock(BulkDocumentWriter.class);
         when(docWriter.write()).thenReturn(results);
 
         // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
         esWriter.setDocumentWriter(docWriter);
         esWriter.init(stormConf, topologyContext, writerConfiguration);
-        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
+        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, messages);
 
         // response should contain some successes and some failures
         assertEquals(1, response.getSuccesses().size());
         assertEquals(1, response.getErrors().size());
-        assertTrue(response.getErrors().get(cause).contains(tuples.get(0)));
-        assertTrue(response.getSuccesses().contains(tuples.get(1)));
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void shouldCheckIfNumberOfMessagesMatchNumberOfTuples() {
-        ElasticsearchWriter esWriter = new ElasticsearchWriter();
-        esWriter.setDocumentWriter(mock(BulkDocumentWriter.class));
-        esWriter.init(stormConf, topologyContext, writerConfiguration);
-
-        // there are 5 tuples and only 1 message; there should be 5 messages to match the number of tuples
-        List<Tuple> tuples = createTuples(5);
-        List<JSONObject> messages = createMessages(1);
-
-        esWriter.write("bro", writerConfiguration, tuples, messages);
-        fail("expected exception");
+        assertTrue(response.getErrors().get(cause).contains(new MessageId("message1")));
+        assertTrue(response.getSuccesses().contains(new MessageId("message2")));
     }
 
     @Test
     public void shouldWriteSuccessfullyWhenMessageTimestampIsString() {
-        List<Tuple> tuples = createTuples(1);
-        List<JSONObject> messages = createMessages(1);
+        List<BulkMessage<JSONObject>> messages = createMessages(1);
+        JSONObject message = messages.get(0).getMessage();
 
         // the timestamp is a String, rather than a Long
-        messages.get(0).put(Constants.Fields.TIMESTAMP.getName(), new Long(System.currentTimeMillis()).toString());
+        message.put(Constants.Fields.TIMESTAMP.getName(), new Long(System.currentTimeMillis()).toString());
 
         // create the document
-        JSONObject message = messages.get(0);
+
         String timestamp = (String) message.get(Constants.Fields.TIMESTAMP.getName());
         String guid = (String) message.get(Constants.GUID);
         String sensorType = (String) message.get(Constants.SENSOR_TYPE);
-        TupleBasedDocument document = new TupleBasedDocument(message, guid, sensorType, Long.parseLong(timestamp), tuples.get(0));
+        MessageIdBasedDocument document = new MessageIdBasedDocument(message, guid, sensorType, Long.parseLong(timestamp), new MessageId("message1"));
 
         // create a document writer which will successfully write that document
-        BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>();
+        BulkDocumentWriterResults<MessageIdBasedDocument> results = new BulkDocumentWriterResults<>();
         results.addSuccess(document);
-        BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+        BulkDocumentWriter<MessageIdBasedDocument> docWriter = mock(BulkDocumentWriter.class);
         when(docWriter.write()).thenReturn(results);
 
         // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
         esWriter.setDocumentWriter(docWriter);
         esWriter.init(stormConf, topologyContext, writerConfiguration);
-        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
+        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, messages);
 
         // response should only contain successes
         assertFalse(response.hasErrors());
-        assertTrue(response.getSuccesses().contains(tuples.get(0)));
+        assertTrue(response.getSuccesses().contains(new MessageId("message1")));
     }
 
     @Test
     public void shouldWriteSuccessfullyWhenMissingGUID() {
-        // create a tuple and a message associated with that tuple
-        List<Tuple> tuples = createTuples(1);
-        List<JSONObject> messages = createMessages(1);
+        // create a message id and a message associated with that tuple
+        List<BulkMessage<JSONObject>> messages = createMessages(1);
 
         // remove the GUID from the message
-        assertNotNull(messages.get(0).remove(Constants.GUID));
+        assertNotNull(messages.get(0).getMessage().remove(Constants.GUID));
 
         // create a document writer which will successfully write all
-        BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>();
-        results.addSuccess(createDocument(messages.get(0), tuples.get(0)));
-        BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+        BulkDocumentWriterResults<MessageIdBasedDocument> results = new BulkDocumentWriterResults<>();
+        results.addSuccess(createDocument(messages.get(0)));
+        BulkDocumentWriter<MessageIdBasedDocument> docWriter = mock(BulkDocumentWriter.class);
         when(docWriter.write()).thenReturn(results);
 
         // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
         esWriter.setDocumentWriter(docWriter);
         esWriter.init(stormConf, topologyContext, writerConfiguration);
-        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
+        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, messages);
 
         // response should only contain successes
         assertFalse(response.hasErrors());
-        assertTrue(response.getSuccesses().contains(tuples.get(0)));
+        assertTrue(response.getSuccesses().contains(new MessageId("message1")));
     }
 
-    private TupleBasedDocument createDocument(JSONObject message, Tuple tuple) {
-        Long timestamp = (Long) message.get(Constants.Fields.TIMESTAMP.getName());
+    private MessageIdBasedDocument createDocument(BulkMessage<JSONObject> bulkWriterMessage) {
+        MessageId messageId = bulkWriterMessage.getId();
+        JSONObject message = bulkWriterMessage.getMessage();
+        Long timestamp = (Long) bulkWriterMessage.getMessage().get(Constants.Fields.TIMESTAMP.getName());
         String guid = (String) message.get(Constants.GUID);
         String sensorType = (String) message.get(Constants.SENSOR_TYPE);
-        return new TupleBasedDocument(message, guid, sensorType, timestamp, tuple);
+        return new MessageIdBasedDocument(message, guid, sensorType, timestamp, messageId);
     }
 
     private JSONObject message() {
@@ -289,18 +271,10 @@ public class ElasticsearchWriterTest {
         return globals;
     }
 
-    private List<Tuple> createTuples(int count) {
-        List<Tuple> tuples = new ArrayList<>();
-        for(int i=0; i<count; i++) {
-            tuples.add(mock(Tuple.class));
-        }
-        return tuples;
-    }
-
-    private List<JSONObject> createMessages(int count) {
-        List<JSONObject> messages = new ArrayList<>();
+    private List<BulkMessage<JSONObject>> createMessages(int count) {
+        List<BulkMessage<JSONObject>> messages = new ArrayList<>();
         for(int i=0; i<count; i++) {
-            messages.add(message());
+            messages.add(new BulkMessage<>(new MessageId("message" + (i + 1)), message()));
         }
         return messages;
     }
diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java
index d0e9f67..10507c7 100644
--- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java
+++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/writer/SimpleHbaseEnrichmentWriter.java
@@ -18,8 +18,9 @@
 
 package org.apache.metron.enrichment.writer;
 
+import org.apache.metron.common.writer.BulkMessage;
+import org.apache.metron.common.writer.MessageId;
 import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import org.apache.hadoop.conf.Configuration;
@@ -313,8 +314,7 @@ public class SimpleHbaseEnrichmentWriter extends AbstractWriter implements BulkM
   @Override
   public BulkWriterResponse write(String sensorType
                     , WriterConfiguration configurations
-                    , Iterable<Tuple> tuples
-                    , List<JSONObject> messages
+                    , List<BulkMessage<JSONObject>> messages
                     ) throws Exception
   {
     Map<String, Object> sensorConfig = configurations.getSensorConfig(sensorType);
@@ -324,9 +324,9 @@ public class SimpleHbaseEnrichmentWriter extends AbstractWriter implements BulkM
     String enrichmentType = enrichmentTypeObj == null?null:enrichmentTypeObj.toString();
     Set<String> valueColumns = new HashSet<>(getColumns(Configurations.VALUE_COLUMNS.get(sensorConfig), true));
     List<Put> puts = new ArrayList<>();
-    for(JSONObject message : messages) {
-      EnrichmentKey key = getKey(message, transformer, enrichmentType);
-      EnrichmentValue value = getValue(message, transformer.keySet, valueColumns);
+    for(BulkMessage<JSONObject> bulkWriterMessage : messages) {
+      EnrichmentKey key = getKey(bulkWriterMessage.getMessage(), transformer, enrichmentType);
+      EnrichmentValue value = getValue(bulkWriterMessage.getMessage(), transformer.keySet, valueColumns);
       if(key == null || value == null) {
         continue;
       }
@@ -336,17 +336,17 @@ public class SimpleHbaseEnrichmentWriter extends AbstractWriter implements BulkM
         puts.add(put);
       }
     }
-
+    Set<MessageId> ids = messages.stream().map(BulkMessage::getId).collect(Collectors.toSet());
     BulkWriterResponse response = new BulkWriterResponse();
     try {
       table.put(puts);
     } catch (Exception e) {
-      response.addAllErrors(e, tuples);
+      response.addAllErrors(e, ids);
       return response;
     }
 
     // Can return no errors, because put will throw Exception on error.
-    response.addAllSuccesses(tuples);
+    response.addAllSuccesses(ids);
     return response;
   }
 
diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/writer/HBaseWriter.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/writer/HBaseWriter.java
deleted file mode 100644
index f46cfde..0000000
--- a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/writer/HBaseWriter.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.hbase.writer;
-
-import org.apache.storm.tuple.Tuple;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.hbase.HTableProvider;
-import org.apache.metron.hbase.TableProvider;
-import org.apache.metron.common.utils.ReflectionUtils;
-import org.apache.metron.common.writer.MessageWriter;
-import org.json.simple.JSONObject;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Map;
-
-public abstract class HBaseWriter implements MessageWriter<JSONObject>, Serializable {
-
-  private String tableName;
-  private String connectorImpl;
-  private TableProvider provider;
-  private HTableInterface table;
-
-  public HBaseWriter(String tableName) {
-    this.tableName = tableName;
-  }
-
-  public HBaseWriter withProviderImpl(String connectorImpl) {
-    this.connectorImpl = connectorImpl;
-    return this;
-  }
-
-  @Override
-  public void init() {
-    final Configuration config = HBaseConfiguration.create();
-    try {
-      provider = ReflectionUtils.createInstance(connectorImpl, new HTableProvider());
-      table = provider.getTable(config, tableName);
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-  }
-
-  @Override
-  public void write(String sourceType, WriterConfiguration configurations, Tuple tuple, JSONObject message) throws Exception {
-    Put put = new Put(getKey(tuple, message));
-    Map<String, byte[]> values = getValues(tuple, message);
-    for(String column: values.keySet()) {
-      String[] columnParts = column.split(":");
-      long timestamp = getTimestamp(tuple, message);
-      if (timestamp > -1) {
-        put.addColumn(Bytes.toBytes(columnParts[0]), Bytes.toBytes(columnParts[1]), timestamp, values.get(column));
-      } else {
-        put.addColumn(Bytes.toBytes(columnParts[0]), Bytes.toBytes(columnParts[1]), values.get(column));
-      }
-    }
-    table.put(put);
-  }
-
-  @Override
-  public void close() throws Exception {
-    table.close();
-  }
-
-  public abstract byte[] getKey(Tuple tuple, JSONObject message);
-  public abstract long getTimestamp(Tuple tuple, JSONObject message);
-  public abstract Map<String, byte[]> getValues(Tuple tuple, JSONObject message);
-}
diff --git a/metron-platform/metron-integration-test/src/main/sample/data/jsonMapQuery/parsed/jsonMapExampleParsed b/metron-platform/metron-integration-test/src/main/sample/data/jsonMapQuery/parsed/jsonMapExampleParsed
index e614bda..b5658d5 100644
--- a/metron-platform/metron-integration-test/src/main/sample/data/jsonMapQuery/parsed/jsonMapExampleParsed
+++ b/metron-platform/metron-integration-test/src/main/sample/data/jsonMapQuery/parsed/jsonMapExampleParsed
@@ -1,2 +1,10 @@
-{ "string" : "bar", "number" : 2, "ignored" : [ "blah" ], "original_string":"{ \"string\" : \"bar\", \"number\" : 2, \"ignored\" : [ \"blah\" ] }","timestamp":1000000000000, "source.type":"jsonMapQuery","guid":"this-is-random-uuid-will-be-36-chars" }
+{ "number" : 1, "ignored" : [ "blah" ], "original_string":"{ \"string\" : \"bar\", \"number\" : 1, \"ignored\" : [ \"blah\" ] }","string" : "bar","timestamp":1000000000000, "source.type":"jsonMapQuery","guid":"this-is-random-uuid-will-be-36-chars" }
+{ "number" : 2 , "original_string" : "{ \"number\" : 2 }", "source.type":"jsonMapQuery","timestamp":1000000000000,"guid":"this-is-random-uuid-will-be-36-chars"}
+{ "number" : 3 , "original_string" : "{ \"number\" : 3 }", "source.type":"jsonMapQuery","timestamp":1000000000000,"guid":"this-is-random-uuid-will-be-36-chars"}
+{ "number" : 4 , "original_string" : "{ \"number\" : 4 }", "source.type":"jsonMapQuery","timestamp":1000000000000,"guid":"this-is-random-uuid-will-be-36-chars"}
+{ "number" : 5 , "original_string" : "{ \"number\" : 5 }", "source.type":"jsonMapQuery","timestamp":1000000000000,"guid":"this-is-random-uuid-will-be-36-chars"}
+{ "number" : 6 , "original_string" : "{ \"number\" : 6 }", "source.type":"jsonMapQuery","timestamp":1000000000000,"guid":"this-is-random-uuid-will-be-36-chars"}
 { "number" : 7 , "original_string" : "{ \"number\" : 7 }", "source.type":"jsonMapQuery","timestamp":1000000000000,"guid":"this-is-random-uuid-will-be-36-chars"}
+{ "number" : 8 , "original_string" : "{ \"number\" : 8 }", "source.type":"jsonMapQuery","timestamp":1000000000000,"guid":"this-is-random-uuid-will-be-36-chars"}
+{ "number" : 9 , "original_string" : "{ \"number\" : 9 }", "source.type":"jsonMapQuery","timestamp":1000000000000,"guid":"this-is-random-uuid-will-be-36-chars"}
+{ "number" : 10 , "original_string" : "{ \"number\" : 10 }", "source.type":"jsonMapQuery","timestamp":1000000000000,"guid":"this-is-random-uuid-will-be-36-chars"}
\ No newline at end of file
diff --git a/metron-platform/metron-integration-test/src/main/sample/data/jsonMapQuery/raw/jsonMapExampleOutput b/metron-platform/metron-integration-test/src/main/sample/data/jsonMapQuery/raw/jsonMapExampleOutput
index 8f25f4f..58ba68a 100644
--- a/metron-platform/metron-integration-test/src/main/sample/data/jsonMapQuery/raw/jsonMapExampleOutput
+++ b/metron-platform/metron-integration-test/src/main/sample/data/jsonMapQuery/raw/jsonMapExampleOutput
@@ -1 +1 @@
-{"foo":[{ "string" : "bar", "number" : 2, "ignored" : [ "blah" ] },{ "number" : 7 }]}
\ No newline at end of file
+{"foo":[{ "string" : "bar", "number" : 1, "ignored" : [ "blah" ] },{ "number" : 2 },{ "number" : 3 },{ "number" : 4 },{ "number" : 5 },{ "number" : 6 },{ "number" : 7 },{ "number" : 8 },{ "number" : 9 },{ "number" : 10 }]}
\ No newline at end of file
diff --git a/metron-platform/metron-parsing/metron-parsers-common/src/main/config/zookeeper/parsers/jsonMapQuery.json b/metron-platform/metron-parsing/metron-parsers-common/src/main/config/zookeeper/parsers/jsonMapQuery.json
index 7dad779..a3e43cd 100644
--- a/metron-platform/metron-parsing/metron-parsers-common/src/main/config/zookeeper/parsers/jsonMapQuery.json
+++ b/metron-platform/metron-parsing/metron-parsers-common/src/main/config/zookeeper/parsers/jsonMapQuery.json
@@ -1,5 +1,5 @@
 {
   "parserClassName":"org.apache.metron.parsers.json.JSONMapParser",
   "sensorTopic":"jsonMapQuery",
-  "parserConfig": {"jsonpQuery":"$.foo"}
+  "parserConfig": {"jsonpQuery":"$.foo", "batchSize": 5}
 }
\ No newline at end of file
diff --git a/metron-platform/metron-parsing/metron-parsers-common/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java b/metron-platform/metron-parsing/metron-parsers-common/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
index da884f1..64d1d61 100644
--- a/metron-platform/metron-parsing/metron-parsers-common/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
+++ b/metron-platform/metron-parsing/metron-parsers-common/src/test/java/org/apache/metron/writers/SimpleHBaseEnrichmentWriterTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.writer.BulkMessage;
 import org.apache.metron.enrichment.converter.EnrichmentConverter;
 import org.apache.metron.enrichment.converter.EnrichmentKey;
 import org.apache.metron.enrichment.converter.EnrichmentValue;
@@ -72,9 +73,8 @@ public class SimpleHBaseEnrichmentWriterTest {
 
     writer.write( SENSOR_TYPE
             , configuration
-            , null
-            , new ArrayList<JSONObject>() {{
-              add(new JSONObject(ImmutableMap.of("ip", "localhost", "user", "cstella", "foo", "bar")));
+            , new ArrayList<BulkMessage<JSONObject>>() {{
+              add(new BulkMessage<>("messageId", new JSONObject(ImmutableMap.of("ip", "localhost", "user", "cstella", "foo", "bar"))));
             }}
     );
     List<LookupKV<EnrichmentKey, EnrichmentValue>> values = getValues();
@@ -100,9 +100,8 @@ public class SimpleHBaseEnrichmentWriterTest {
 
     writer.write( SENSOR_TYPE
             , configuration
-            , null
-            , new ArrayList<JSONObject>() {{
-              add(new JSONObject(ImmutableMap.of("ip", "localhost", "user", "cstella", "foo", "bar")));
+            , new ArrayList<BulkMessage<JSONObject>>() {{
+              add(new BulkMessage<>("messageId", new JSONObject(ImmutableMap.of("ip", "localhost", "user", "cstella", "foo", "bar"))));
             }}
     );
     List<LookupKV<EnrichmentKey, EnrichmentValue>> values = getValues();
@@ -128,9 +127,8 @@ public class SimpleHBaseEnrichmentWriterTest {
 
     writer.write( SENSOR_TYPE
             , configuration
-            , null
-            , new ArrayList<JSONObject>() {{
-              add(new JSONObject(ImmutableMap.of("ip", "localhost", "user", "cstella", "foo", "bar")));
+            , new ArrayList<BulkMessage<JSONObject>>() {{
+              add(new BulkMessage<>("messageId", new JSONObject(ImmutableMap.of("ip", "localhost", "user", "cstella", "foo", "bar"))));
             }}
     );
     List<LookupKV<EnrichmentKey, EnrichmentValue>> values = getValues();
diff --git a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
index d5f8d1e..21aa087 100644
--- a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
+++ b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/ParserBolt.java
@@ -23,9 +23,12 @@ import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.function.Function;
+import java.util.stream.Collectors;
+
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.bolt.ConfiguredParserBolt;
 import org.apache.metron.common.configuration.ParserConfigurations;
@@ -37,6 +40,9 @@ import org.apache.metron.common.message.MessageGetters;
 import org.apache.metron.common.message.metadata.RawMessage;
 import org.apache.metron.common.message.metadata.RawMessageUtil;
 import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.common.utils.MessageUtils;
+import org.apache.metron.common.writer.BulkMessage;
+import org.apache.metron.writer.AckTuplesPolicy;
 import org.apache.metron.parsers.ParserRunner;
 import org.apache.metron.parsers.ParserRunnerResults;
 import org.apache.metron.stellar.common.CachingStellarProcessor;
@@ -67,8 +73,9 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
 
   private transient MessageGetStrategy messageGetStrategy;
   private int requestedTickFreqSecs;
-  private int defaultBatchTimeout;
+  private int maxBatchTimeout;
   private int batchTimeoutDivisor = 1;
+  private transient AckTuplesPolicy ackTuplesPolicy;
 
   public ParserBolt( String zookeeperUrl
                    , ParserRunner parserRunner
@@ -77,31 +84,20 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
     super(zookeeperUrl);
     this.parserRunner = parserRunner;
     this.sensorToWriterMap = sensorToWriterMap;
-
-    // Ensure that all sensors are either bulk sensors or not bulk sensors.  Can't mix and match.
-    Boolean handleAcks = null;
-    for (Map.Entry<String, WriterHandler> entry : sensorToWriterMap.entrySet()) {
-      boolean writerHandleAck = entry.getValue().handleAck();
-      if (handleAcks == null) {
-        handleAcks = writerHandleAck;
-      } else if (!handleAcks.equals(writerHandleAck)) {
-        throw new IllegalArgumentException("All writers must match when calling handleAck()");
-      }
-    }
   }
 
   /**
    * If this ParserBolt is in a topology where it is daisy-chained with
    * other queuing Writers, then the max amount of time it takes for a tuple
    * to clear the whole topology is the sum of all the batchTimeouts for all the
-   * daisy-chained Writers.  In the common case where each Writer is using the default
+   * daisy-chained Writers.  In the common case where each Writer is using the max
    * batchTimeout, it is then necessary to divide that batchTimeout by the number of
    * daisy-chained Writers.  There are no examples of daisy-chained batching Writers
    * in the current Metron topologies, but the feature is available as a "fluent"-style
    * mutator if needed.  It would be used in the parser topology builder.
    * Default value, if not otherwise set, is 1.
    *
-   * If non-default batchTimeouts are configured for some components, the administrator
+   * If sensor batchTimeouts are configured for some components, the administrator
    * may want to take this behavior into account.
    *
    * @param batchTimeoutDivisor
@@ -158,6 +154,13 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
   }
 
   /**
+   * Used only for unit testing
+   */
+  public void setAckTuplesPolicy(AckTuplesPolicy ackTuplesPolicy) {
+    this.ackTuplesPolicy = ackTuplesPolicy;
+  }
+
+  /**
    * This method is called by TopologyBuilder.createTopology() to obtain topology and
    * bolt specific configuration parameters.  We use it primarily to configure how often
    * a tick tuple will be sent to our bolt.
@@ -181,8 +184,8 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
 
     BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(writerconf::getAllConfiguredTimeouts, batchTimeoutDivisor);
     this.requestedTickFreqSecs = timeoutHelper.getRecommendedTickInterval();
-    //And while we've got BatchTimeoutHelper handy, capture the defaultBatchTimeout for writerComponent.
-    this.defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout();
+    //And while we've got BatchTimeoutHelper handy, capture the maxBatchTimeout for writerComponent.
+    this.maxBatchTimeout = timeoutHelper.getMaxBatchTimeout();
 
     Map<String, Object> conf = super.getComponentConfiguration();
     if (conf == null) {
@@ -201,6 +204,8 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
     this.collector = collector;
     this.parserRunner.init(this::getConfigurations, initializeStellar());
 
+    ackTuplesPolicy = new AckTuplesPolicy(collector, messageGetStrategy);
+
     // Need to prep all sensors
     for (Map.Entry<String, WriterHandler> entry: sensorToWriterMap.entrySet()) {
       String sensor = entry.getKey();
@@ -215,17 +220,17 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
       }
 
       WriterHandler writer = sensorToWriterMap.get(sensor);
-      writer.init(stormConf, context, collector, getConfigurations());
-      if (defaultBatchTimeout == 0) {
-        //This means getComponentConfiguration was never called to initialize defaultBatchTimeout,
+      if (maxBatchTimeout == 0) {
+        //This means getComponentConfiguration was never called to initialize maxBatchTimeout,
         //probably because we are in a unit test scenario.  So calculate it here.
         WriterConfiguration writerConfig = getConfigurationStrategy()
-            .createWriterConfig(writer.getBulkMessageWriter(), getConfigurations());
+                .createWriterConfig(writer.getBulkMessageWriter(), getConfigurations());
         BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(
-            writerConfig::getAllConfiguredTimeouts, batchTimeoutDivisor);
-        defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout();
+                writerConfig::getAllConfiguredTimeouts, batchTimeoutDivisor);
+        maxBatchTimeout = timeoutHelper.getMaxBatchTimeout();
       }
-      writer.setDefaultBatchTimeout(defaultBatchTimeout);
+
+      writer.init(stormConf, context, collector, getConfigurations(), ackTuplesPolicy, maxBatchTimeout);
     }
   }
 
@@ -250,16 +255,25 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
               , sensorParserConfig.getRawMessageStrategyConfig()
       );
       ParserRunnerResults<JSONObject> parserRunnerResults = parserRunner.execute(sensorType, rawMessage, parserConfigurations);
-      long numWritten = parserRunnerResults.getMessages().stream()
-              .map(message -> handleMessage(sensorType, originalMessage, tuple, message, collector))
-              .filter(result -> result)
-              .count();
       parserRunnerResults.getErrors().forEach(error -> ErrorUtils.handleError(collector, error));
 
-      //if we are supposed to ack the tuple OR if we've never passed this tuple to the bulk writer
-      //(meaning that none of the messages are valid either globally or locally)
-      //then we want to handle the ack ourselves.
-      if (!sensorToWriterMap.get(sensorType).handleAck() || numWritten == 0) {
+      WriterHandler writer = sensorToWriterMap.get(sensorType);
+      int numWritten = 0;
+      List<JSONObject> messages = parserRunnerResults.getMessages();
+      List<String> messageIds = messages.stream().map(MessageUtils::getGuid).collect(Collectors.toList());
+      ackTuplesPolicy.addTupleMessageIds(tuple, messageIds);
+      for(int i = 0; i < messages.size(); i++) {
+        String messageId = messageIds.get(i);
+        JSONObject message = messages.get(i);
+        try {
+          writer.write(sensorType, new BulkMessage<>(messageId, message), getConfigurations());
+          numWritten++;
+        } catch (Exception ex) {
+          handleError(sensorType, originalMessage, tuple, ex, collector);
+        }
+      }
+
+      if (numWritten == 0) {
         collector.ack(tuple);
       }
 
@@ -306,17 +320,6 @@ public class ParserBolt extends ConfiguredParserBolt implements Serializable {
     }
   }
 
-  protected boolean handleMessage(String sensorType, byte[] originalMessage, Tuple tuple, JSONObject message, OutputCollector collector) {
-    WriterHandler writer = sensorToWriterMap.get(sensorType);
-    try {
-      writer.write(sensorType, tuple, message, getConfigurations(), messageGetStrategy);
-      return true;
-    } catch (Exception ex) {
-      handleError(sensorType, originalMessage, tuple, ex, collector);
-      return false;
-    }
-  }
-
   protected void handleError(String sensorType, byte[] originalMessage, Tuple tuple, Throwable ex, OutputCollector collector) {
     MetronError error = new MetronError()
             .withErrorType(Constants.ErrorType.PARSER_ERROR)
diff --git a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
index abf3e46..6a174f3 100644
--- a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
+++ b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterBolt.java
@@ -18,14 +18,15 @@
 
 package org.apache.metron.parsers.bolt;
 
-import java.util.Collections;
-import java.util.Map;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.error.MetronError;
 import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.message.MessageGetters;
 import org.apache.metron.common.utils.ErrorUtils;
+import org.apache.metron.common.utils.MessageUtils;
+import org.apache.metron.common.writer.BulkMessage;
+import org.apache.metron.writer.AckTuplesPolicy;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -33,13 +34,20 @@ import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
 
+import java.util.Collections;
+import java.util.Map;
+
 public class WriterBolt extends BaseRichBolt {
   private WriterHandler handler;
   private ParserConfigurations configuration;
   private String sensorType;
   private Constants.ErrorType errorType = Constants.ErrorType.DEFAULT_ERROR;
+  //In test scenarios, maxBatchTimeout may not be correctly initialized, so do it here.
+  //This is a conservative maxBatchTimeout for a vanilla bolt with batchTimeoutDivisor=2
+  public static final int UNINITIALIZED_MAX_BATCH_TIMEOUT = 6;
   private transient MessageGetStrategy messageGetStrategy;
   private transient OutputCollector collector;
+  private transient AckTuplesPolicy ackTuplesPolicy;
   public WriterBolt(WriterHandler handler, ParserConfigurations configuration, String sensorType) {
     this.handler = handler;
     this.configuration = configuration;
@@ -55,7 +63,8 @@ public class WriterBolt extends BaseRichBolt {
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
     this.collector = collector;
     messageGetStrategy = MessageGetters.DEFAULT_JSON_FROM_FIELD.get();
-    handler.init(stormConf, context, collector, configuration);
+    ackTuplesPolicy = new AckTuplesPolicy(collector, messageGetStrategy);
+    handler.init(stormConf, context, collector, configuration, ackTuplesPolicy, UNINITIALIZED_MAX_BATCH_TIMEOUT);
   }
 
   private JSONObject getMessage(Tuple tuple) {
@@ -76,10 +85,9 @@ public class WriterBolt extends BaseRichBolt {
     JSONObject message = null;
     try {
       message = (JSONObject) messageGetStrategy.get(tuple);
-      handler.write(sensorType, tuple, message, configuration, messageGetStrategy);
-      if(!handler.handleAck()) {
-        collector.ack(tuple);
-      }
+      String messageId = MessageUtils.getGuid(message);
+      ackTuplesPolicy.addTupleMessageIds(tuple, Collections.singleton(messageId));
+      handler.write(sensorType, new BulkMessage<>(messageId, message), configuration);
     } catch (Throwable e) {
       MetronError error = new MetronError()
               .withErrorType(errorType)
diff --git a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
index 3916dea..434db45 100644
--- a/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
+++ b/metron-platform/metron-parsing/metron-parsing-storm/src/main/java/org/apache/metron/parsers/bolt/WriterHandler.java
@@ -20,8 +20,6 @@ package org.apache.metron.parsers.bolt;
 
 import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
-import java.util.Collection;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.function.Function;
 import org.apache.metron.common.configuration.ParserConfigurations;
@@ -31,12 +29,13 @@ import org.apache.metron.common.configuration.writer.SingleBatchConfigurationFac
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.writer.BulkMessageWriter;
+import org.apache.metron.common.writer.BulkMessage;
 import org.apache.metron.common.writer.MessageWriter;
+import org.apache.metron.writer.AckTuplesPolicy;
 import org.apache.metron.writer.BulkWriterComponent;
 import org.apache.metron.writer.WriterToBulkWriter;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +45,7 @@ public class WriterHandler implements Serializable {
   private BulkMessageWriter<JSONObject> messageWriter;
   private transient BulkWriterComponent<JSONObject> writerComponent;
   private transient Function<ParserConfigurations, WriterConfiguration> writerTransformer;
-  private boolean isBulk = false;
+  private boolean isBulk;
   private ConfigurationStrategy configStrategy = ConfigurationsStrategies.PARSERS;
 
   public WriterHandler(MessageWriter<JSONObject> writer) {
@@ -59,10 +58,6 @@ public class WriterHandler implements Serializable {
     messageWriter = writer;
   }
 
-  public boolean handleAck() {
-    return isBulk;
-  }
-
   public boolean isWriterToBulkWriter() {
     return messageWriter instanceof  WriterToBulkWriter;
   }
@@ -71,7 +66,8 @@ public class WriterHandler implements Serializable {
     return messageWriter;
   }
 
-  public void init(Map stormConf, TopologyContext topologyContext, OutputCollector collector, ParserConfigurations configurations) {
+  public void init(Map stormConf, TopologyContext topologyContext, OutputCollector collector, ParserConfigurations configurations,
+                   AckTuplesPolicy ackTuplesPolicy, int maxBatchTimeout) {
     if(isBulk) {
       writerTransformer = config -> configStrategy.createWriterConfig(messageWriter, config);
     }
@@ -83,21 +79,15 @@ public class WriterHandler implements Serializable {
     } catch (Exception e) {
       throw new IllegalStateException("Unable to initialize message writer", e);
     }
-    this.writerComponent = new BulkWriterComponent<JSONObject>(collector, isBulk, isBulk) {
-      @Override
-      protected Collection<Tuple> createTupleCollection() {
-        return new HashSet<>();
-      }
-    };
+    this.writerComponent = new BulkWriterComponent<>(maxBatchTimeout);
+    this.writerComponent.addFlushPolicy(ackTuplesPolicy);
   }
 
-  public void write( String sensorType
-                   , Tuple tuple
-                   , JSONObject message
-                   , ParserConfigurations configurations
-                   , MessageGetStrategy messageGetStrategy
-                   ) throws Exception {
-    writerComponent.write(sensorType, tuple, message, messageWriter, writerTransformer.apply(configurations), messageGetStrategy);
+  public void write(String sensorType
+          , BulkMessage<JSONObject> bulkWriterMessage
+          , ParserConfigurations configurations
+  ) throws Exception {
+    writerComponent.write(sensorType, bulkWriterMessage, messageWriter, writerTransformer.apply(configurations));
   }
 
   public void flush(ParserConfigurations configurations, MessageGetStrategy messageGetStrategy)
@@ -105,24 +95,8 @@ public class WriterHandler implements Serializable {
     if (!(messageWriter instanceof WriterToBulkWriter)) {
       //WriterToBulkWriter doesn't allow batching, so no need to flush on Tick.
       LOG.debug("Flushing message queues older than their batchTimeouts");
-      writerComponent.flushTimeouts(messageWriter, writerTransformer.apply(configurations),
-          messageGetStrategy);
-    }
-  }
-
-  public void errorAll(String sensorType, Throwable e, MessageGetStrategy messageGetStrategy) {
-    writerComponent.errorAll(sensorType, e, messageGetStrategy);
-  }
-
-  /**
-   * Sets batch timeout on the underlying component
-   * @param defaultBatchTimeout
-   */
-  public void setDefaultBatchTimeout(int defaultBatchTimeout) {
-    if (writerComponent == null) {
-      throw new UnsupportedOperationException("Must call init prior to calling this method.");
+      writerComponent.flushAll(messageWriter, writerTransformer.apply(configurations));
     }
-    writerComponent.setDefaultBatchTimeout(defaultBatchTimeout);
   }
 
 }
diff --git a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
index 0166a63..90c882f 100644
--- a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
+++ b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/ParserBoltTest.java
@@ -24,6 +24,8 @@ import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.error.MetronError;
 import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.message.metadata.RawMessage;
+import org.apache.metron.common.writer.BulkMessage;
+import org.apache.metron.writer.AckTuplesPolicy;
 import org.apache.metron.parsers.DefaultParserRunnerResults;
 import org.apache.metron.parsers.ParserRunnerImpl;
 import org.apache.metron.parsers.ParserRunnerResults;
@@ -35,29 +37,34 @@ import org.apache.storm.Config;
 import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.mockito.InOrder;
 import org.mockito.Mock;
 
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.argThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 public class ParserBoltTest extends BaseBoltTest {
@@ -75,19 +82,19 @@ public class ParserBoltTest extends BaseBoltTest {
   private WriterHandler writerHandler;
 
   @Mock
-  private WriterHandler writerHandlerHandleAck;
-
-  @Mock
   private MessageGetStrategy messageGetStrategy;
 
   @Mock
   private Context stellarContext;
 
+  @Mock
+  private AckTuplesPolicy bulkWriterResponseHandler;
+
   private class MockParserRunner extends ParserRunnerImpl {
 
     private boolean isInvalid = false;
     private RawMessage rawMessage;
-    private JSONObject message;
+    private List<JSONObject> messages;
 
     public MockParserRunner(HashSet<String> sensorTypes) {
       super(sensorTypes);
@@ -97,14 +104,16 @@ public class ParserBoltTest extends BaseBoltTest {
     public ParserRunnerResults<JSONObject> execute(String sensorType, RawMessage rawMessage, ParserConfigurations parserConfigurations) {
       DefaultParserRunnerResults parserRunnerResults = new DefaultParserRunnerResults();
       this.rawMessage = rawMessage;
-      if (!isInvalid) {
-        parserRunnerResults.addMessage(message);
-      } else {
-        MetronError error = new MetronError()
-                .withErrorType(Constants.ErrorType.PARSER_INVALID)
-                .withSensorType(Collections.singleton(sensorType))
-                .addRawMessage(message);
-        parserRunnerResults.addError(error);
+      for(JSONObject message: messages) {
+        if (!isInvalid) {
+          parserRunnerResults.addMessage(message);
+        } else {
+          MetronError error = new MetronError()
+                  .withErrorType(Constants.ErrorType.PARSER_INVALID)
+                  .withSensorType(Collections.singleton(sensorType))
+                  .addRawMessage(message);
+          parserRunnerResults.addError(error);
+        }
       }
       return parserRunnerResults;
     }
@@ -113,8 +122,8 @@ public class ParserBoltTest extends BaseBoltTest {
       this.isInvalid = isInvalid;
     }
 
-    protected void setMessage(JSONObject message) {
-      this.message = message;
+    protected void setMessages(List<JSONObject> messages) {
+      this.messages = messages;
     }
 
     protected RawMessage getRawMessage() {
@@ -122,24 +131,6 @@ public class ParserBoltTest extends BaseBoltTest {
     }
   }
 
-  @Before
-  public void setup() {
-    when(writerHandler.handleAck()).thenReturn(false);
-    when(writerHandlerHandleAck.handleAck()).thenReturn(true);
-
-  }
-
-  @Test
-  public void shouldThrowExceptionOnDifferentHandleAck() {
-    exception.expect(IllegalArgumentException.class);
-    exception.expectMessage("All writers must match when calling handleAck()");
-
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
-      put("yaf", writerHandler);
-      put("bro", writerHandlerHandleAck);
-    }});
-  }
-
   @Test
   public void withBatchTimeoutDivisorShouldSetBatchTimeoutDivisor() {
     ParserBolt parserBolt = new ParserBolt("zookeeperUrl", parserRunner, new HashMap<String, WriterHandler>() {{
@@ -221,8 +212,7 @@ public class ParserBoltTest extends BaseBoltTest {
     Map<String, String> topicToSensorMap = parserBolt.getTopicToSensorMap();
     Assert.assertEquals(1, topicToSensorMap.size());
     Assert.assertEquals("yaf", topicToSensorMap.get("yafTopic"));
-    verify(writerHandler).init(stormConf, topologyContext, outputCollector, parserConfigurations);
-    verify(writerHandler).setDefaultBatchTimeout(14);
+    verify(writerHandler).init(eq(stormConf), eq(topologyContext), eq(outputCollector), eq(parserConfigurations), any(AckTuplesPolicy.class), eq(14));
   }
 
   @Test
@@ -276,7 +266,7 @@ public class ParserBoltTest extends BaseBoltTest {
     ParserConfigurations parserConfigurations = new ParserConfigurations();
     parserConfigurations.updateSensorParserConfig("yaf", new SensorParserConfig());
 
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", mockParserRunner, new HashMap<String, WriterHandler>() {{
+    ParserBolt parserBolt = spy(new ParserBolt("zookeeperUrl", mockParserRunner, new HashMap<String, WriterHandler>() {{
       put("yaf", writerHandler);
     }}) {
 
@@ -284,38 +274,83 @@ public class ParserBoltTest extends BaseBoltTest {
       public ParserConfigurations getConfigurations() {
         return parserConfigurations;
       }
-    };
+    });
 
     parserBolt.setMessageGetStrategy(messageGetStrategy);
     parserBolt.setOutputCollector(outputCollector);
     parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{
       put("yafTopic", "yaf");
     }});
+    parserBolt.setAckTuplesPolicy(bulkWriterResponseHandler);
+
     JSONObject message = new JSONObject();
+    message.put(Constants.GUID, "messageId");
     message.put("field", "value");
-    mockParserRunner.setMessage(message);
+    mockParserRunner.setMessages(Collections.singletonList(message));
     RawMessage expectedRawMessage = new RawMessage("originalMessage".getBytes(StandardCharsets.UTF_8), new HashMap<>());
 
     {
-      // Verify the correct message is written and ack is handled
       parserBolt.execute(t1);
 
       Assert.assertEquals(expectedRawMessage, mockParserRunner.getRawMessage());
-      verify(writerHandler, times(1)).write("yaf", t1, message, parserConfigurations, messageGetStrategy);
-      verify(outputCollector, times(1)).ack(t1);
+      verify(bulkWriterResponseHandler).addTupleMessageIds(t1, Collections.singletonList("messageId"));
+      verify(writerHandler, times(1)).write("yaf", new BulkMessage<>("messageId", message), parserConfigurations);
     }
-    {
-      // Verify the tuple is not acked when the writer is set to handle ack
-      reset(outputCollector);
-      parserBolt.setSensorToWriterMap(new HashMap<String, WriterHandler>() {{
-        put("yaf", writerHandlerHandleAck);
-      }});
+  }
+
+  @Test
+  public void shouldExecuteOnSuccessWithMultipleMessages() throws Exception {
+    when(messageGetStrategy.get(t1)).thenReturn("originalMessage".getBytes(StandardCharsets.UTF_8));
+    when(t1.getStringByField(FieldsConfiguration.TOPIC.getFieldName())).thenReturn("yafTopic");
+    MockParserRunner mockParserRunner = new MockParserRunner(new HashSet<String>() {{ add("yaf"); }});
+    ParserConfigurations parserConfigurations = new ParserConfigurations();
+    parserConfigurations.updateSensorParserConfig("yaf", new SensorParserConfig());
+
+    ParserBolt parserBolt = spy(new ParserBolt("zookeeperUrl", mockParserRunner, new HashMap<String, WriterHandler>() {{
+      put("yaf", writerHandler);
+    }}) {
 
+      @Override
+      public ParserConfigurations getConfigurations() {
+        return parserConfigurations;
+      }
+    });
+
+    parserBolt.setMessageGetStrategy(messageGetStrategy);
+    parserBolt.setOutputCollector(outputCollector);
+    parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{
+      put("yafTopic", "yaf");
+    }});
+    parserBolt.setAckTuplesPolicy(bulkWriterResponseHandler);
+
+    List<BulkMessage<JSONObject>> messages = new ArrayList<>();
+    for(int i = 0; i < 5; i++) {
+      String messageId = String.format("messageId%s", i + 1);
+      JSONObject message = new JSONObject();
+      message.put(Constants.GUID, messageId);
+      message.put("field", String.format("value%s", i + 1));
+      messages.add(new BulkMessage<>(messageId, message));
+    }
+
+    mockParserRunner.setMessages(messages.stream().map(BulkMessage::getMessage).collect(Collectors.toList()));
+    RawMessage expectedRawMessage = new RawMessage("originalMessage".getBytes(StandardCharsets.UTF_8), new HashMap<>());
+
+    {
+      // Verify the correct message is written and ack is handled
       parserBolt.execute(t1);
 
-      verify(writerHandlerHandleAck, times(1)).write("yaf", t1, message, parserConfigurations, messageGetStrategy);
-      verify(outputCollector, times(0)).ack(t1);
+      Assert.assertEquals(expectedRawMessage, mockParserRunner.getRawMessage());
+
+      InOrder inOrder = inOrder(bulkWriterResponseHandler, writerHandler);
+
+      inOrder.verify(bulkWriterResponseHandler).addTupleMessageIds(t1, Arrays.asList("messageId1", "messageId2", "messageId3", "messageId4", "messageId5"));
+      inOrder.verify(writerHandler, times(1)).write("yaf", messages.get(0), parserConfigurations);
+      inOrder.verify(writerHandler, times(1)).write("yaf", messages.get(1), parserConfigurations);
+      inOrder.verify(writerHandler, times(1)).write("yaf", messages.get(2), parserConfigurations);
+      inOrder.verify(writerHandler, times(1)).write("yaf", messages.get(3), parserConfigurations);
+      inOrder.verify(writerHandler, times(1)).write("yaf", messages.get(4), parserConfigurations);
     }
+    verifyNoMoreInteractions(writerHandler, bulkWriterResponseHandler, outputCollector);
   }
 
   @Test
@@ -346,7 +381,7 @@ public class ParserBoltTest extends BaseBoltTest {
     }});
     JSONObject message = new JSONObject();
     message.put("field", "value");
-    mockParserRunner.setMessage(message);
+    mockParserRunner.setMessages(Collections.singletonList(message));
     RawMessage expectedRawMessage = new RawMessage("originalMessage".getBytes(StandardCharsets.UTF_8), new HashMap<>());
     MetronError error = new MetronError()
             .withErrorType(Constants.ErrorType.PARSER_INVALID)
@@ -407,9 +442,9 @@ public class ParserBoltTest extends BaseBoltTest {
     MockParserRunner mockParserRunner = new MockParserRunner(new HashSet<String>() {{ add("yaf"); }});
     ParserConfigurations parserConfigurations = new ParserConfigurations();
     parserConfigurations.updateSensorParserConfig("yaf", new SensorParserConfig());
-    doThrow(new IllegalStateException("write failed")).when(writerHandler).write(any(), any(), any(), any(), any());
+    doThrow(new IllegalStateException("write failed")).when(writerHandler).write(any(), any(), any());
 
-    ParserBolt parserBolt = new ParserBolt("zookeeperUrl", mockParserRunner, new HashMap<String, WriterHandler>() {{
+    ParserBolt parserBolt = spy(new ParserBolt("zookeeperUrl", mockParserRunner, new HashMap<String, WriterHandler>() {{
       put("yaf", writerHandler);
     }}) {
 
@@ -417,16 +452,18 @@ public class ParserBoltTest extends BaseBoltTest {
       public ParserConfigurations getConfigurations() {
         return parserConfigurations;
       }
-    };
+    });
 
     parserBolt.setMessageGetStrategy(messageGetStrategy);
     parserBolt.setOutputCollector(outputCollector);
     parserBolt.setTopicToSensorMap(new HashMap<String, String>() {{
       put("yafTopic", "yaf");
     }});
+    parserBolt.setAckTuplesPolicy(bulkWriterResponseHandler);
     JSONObject message = new JSONObject();
+    message.put(Constants.GUID, "messageId");
     message.put("field", "value");
-    mockParserRunner.setMessage(message);
+    mockParserRunner.setMessages(Collections.singletonList(message));
 
     MetronError error = new MetronError()
             .withErrorType(Constants.ErrorType.PARSER_ERROR)
@@ -436,8 +473,8 @@ public class ParserBoltTest extends BaseBoltTest {
 
     parserBolt.execute(t1);
 
-    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM),
-            argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
+    verify(bulkWriterResponseHandler, times(1)).addTupleMessageIds(t1, Collections.singletonList("messageId"));
+    verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
     verify(outputCollector, times(1)).reportError(any(IllegalStateException.class));
     verify(outputCollector, times(1)).ack(t1);
   }
diff --git a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
index b04d8f7..68fc15f 100644
--- a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
+++ b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/bolt/WriterBoltTest.java
@@ -21,8 +21,10 @@ package org.apache.metron.parsers.bolt;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -40,6 +42,7 @@ import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.error.MetronError;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.common.writer.MessageId;
 import org.apache.metron.common.writer.MessageWriter;
 import org.apache.metron.test.bolt.BaseBoltTest;
 import org.apache.metron.test.error.MetronErrorJSONMatcher;
@@ -53,6 +56,10 @@ import org.junit.Test;
 import org.mockito.Mock;
 
 public class WriterBoltTest extends BaseBoltTest{
+
+  private static final String MESSAGE_ID_FORMAT = "messageId%d";
+  private static final String MESSAGE_FORMAT = "message%d";
+
   @Mock
   protected TopologyContext topologyContext;
 
@@ -84,32 +91,39 @@ public class WriterBoltTest extends BaseBoltTest{
   public void testBatchHappyPath() throws Exception {
     ParserConfigurations configurations = getConfigurations(5);
     String sensorType = "test";
+    WriterBolt bolt = spy(new WriterBolt(new WriterHandler(batchWriter), configurations, sensorType));
     List<Tuple> tuples = new ArrayList<>();
+    List<MessageId> messageIds = new ArrayList<>();
     for(int i = 0;i < 5;++i) {
       Tuple t = mock(Tuple.class);
-      when(t.getValueByField(eq("message"))).thenReturn(new JSONObject());
+      String messageId = String.format(MESSAGE_ID_FORMAT, i + 1);
+      messageIds.add(new MessageId(messageId));
+      JSONObject message = new JSONObject();
+      message.put(Constants.GUID, messageId);
+      message.put("value", String.format(MESSAGE_FORMAT, i + 1));
+      when(t.getValueByField(eq("message"))).thenReturn(message);
       tuples.add(t);
     }
-    WriterBolt bolt = new WriterBolt(new WriterHandler(batchWriter), configurations, sensorType);
+
     bolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(batchWriter, times(1)).init(any(), any(), any());
     for(int i = 0;i < 4;++i) {
       Tuple t = tuples.get(i);
       bolt.execute(t);
       verify(outputCollector, times(0)).ack(t);
-      verify(batchWriter, times(0)).write(eq(sensorType), any(), any(), any());
+      verify(batchWriter, times(0)).write(eq(sensorType), any(), any());
     }
 
     // Ensure the batch returns the good Tuples
     BulkWriterResponse writerResponse = new BulkWriterResponse();
-    writerResponse.addAllSuccesses(tuples);
-    when(batchWriter.write(any(), any(), any(), any())).thenReturn(writerResponse);
+    writerResponse.addAllSuccesses(messageIds);
+    when(batchWriter.write(any(), any(), any())).thenReturn(writerResponse);
 
     bolt.execute(tuples.get(4));
     for(Tuple t : tuples) {
       verify(outputCollector, times(1)).ack(t);
     }
-    verify(batchWriter, times(1)).write(eq(sensorType), any(), any(), any());
+    verify(batchWriter, times(1)).write(eq(sensorType), any(), any());
     verify(outputCollector, times(0)).reportError(any());
     verify(outputCollector, times(0)).fail(any());
   }
@@ -125,7 +139,7 @@ public class WriterBoltTest extends BaseBoltTest{
     verify(writer, times(1)).init();
     bolt.execute(t);
     verify(outputCollector, times(1)).ack(t);
-    verify(writer, times(1)).write(eq(sensorType), any(), any(), any());
+    verify(writer, times(1)).write(eq(sensorType), any(), any());
     verify(outputCollector, times(0)).reportError(any());
     verify(outputCollector, times(0)).fail(any());
   }
@@ -140,7 +154,7 @@ public class WriterBoltTest extends BaseBoltTest{
     verify(writer, times(1)).init();
     bolt.execute(t);
     verify(outputCollector, times(1)).ack(t);
-    verify(writer, times(0)).write(eq(sensorType), any(), any(), any());
+    verify(writer, times(0)).write(eq(sensorType), any(), any());
     verify(outputCollector, times(1)).reportError(any());
     verify(outputCollector, times(0)).fail(any());
   }
@@ -153,17 +167,17 @@ public class WriterBoltTest extends BaseBoltTest{
     when(t.getValueByField(eq("message"))).thenReturn(new JSONObject());
     WriterBolt bolt = new WriterBolt(new WriterHandler(writer), configurations, sensorType);
     bolt.prepare(new HashMap(), topologyContext, outputCollector);
-    doThrow(new Exception("write error")).when(writer).write(any(), any(), any(), any());
+    doThrow(new Exception("write error")).when(writer).write(any(), any(), any());
     verify(writer, times(1)).init();
     bolt.execute(t);
     verify(outputCollector, times(1)).ack(t);
-    verify(writer, times(1)).write(eq(sensorType), any(), any(), any());
+    verify(writer, times(1)).write(eq(sensorType), any(), any());
     verify(outputCollector, times(1)).reportError(any());
     verify(outputCollector, times(0)).fail(any());
 
     MetronError error = new MetronError()
-            .withErrorType(Constants.ErrorType.DEFAULT_ERROR)
-            .withThrowable(new IllegalStateException("Unhandled bulk errors in response: {java.lang.Exception: write error=[tuple]}"))
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR)
+            .withThrowable(new Exception("write error"))
             .withSensorType(Collections.singleton(sensorType))
             .addRawMessage(new JSONObject());
     verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), argThat(new MetronErrorJSONMatcher(error.getJSONObject())));
@@ -173,10 +187,16 @@ public class WriterBoltTest extends BaseBoltTest{
   public void testBatchErrorPath() throws Exception {
     ParserConfigurations configurations = getConfigurations(5);
     String sensorType = "test";
+    WriterBolt bolt = spy(new WriterBolt(new WriterHandler(batchWriter), configurations, sensorType));
     List<Tuple> tuples = new ArrayList<>();
+    List<MessageId> messageIds = new ArrayList<>();
     for(int i = 0;i < 4;++i) {
       Tuple t = mock(Tuple.class);
-      when(t.getValueByField(eq("message"))).thenReturn(new JSONObject());
+      String messageId = String.format(MESSAGE_ID_FORMAT, i + 1);
+      messageIds.add(new MessageId(messageId));
+      JSONObject message = new JSONObject();
+      message.put("value", String.format(MESSAGE_FORMAT, i + 1));
+      when(t.getValueByField(eq("message"))).thenReturn(message);
       tuples.add(t);
     }
     Tuple errorTuple = mock(Tuple.class);
@@ -184,7 +204,7 @@ public class WriterBoltTest extends BaseBoltTest{
     when(goodTuple.getValueByField(eq("message"))).thenReturn(new JSONObject());
     when(errorTuple.getValueByField(eq("message"))).thenThrow(new IllegalStateException());
 
-    WriterBolt bolt = new WriterBolt(new WriterHandler(batchWriter), configurations, sensorType);
+
     bolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(batchWriter, times(1)).init(any(), any(), any());
 
@@ -192,14 +212,14 @@ public class WriterBoltTest extends BaseBoltTest{
       Tuple t = tuples.get(i);
       bolt.execute(t);
       verify(outputCollector, times(0)).ack(t);
-      verify(batchWriter, times(0)).write(eq(sensorType), any(), any(), any());
+      verify(batchWriter, times(0)).write(eq(sensorType), any(), any());
     }
 
     // Add the good tuples.  Do not add the error tuple, because this is testing an exception on access, not a failure on write.
     BulkWriterResponse writerResponse = new BulkWriterResponse();
-    writerResponse.addAllSuccesses(tuples);
-    writerResponse.addSuccess(goodTuple);
-    when(batchWriter.write(any(), any(), any(), any())).thenReturn(writerResponse);
+    writerResponse.addAllSuccesses(messageIds);
+    writerResponse.addSuccess(new MessageId("goodMessage"));
+    when(batchWriter.write(any(), any(), any())).thenReturn(writerResponse);
 
     bolt.execute(errorTuple);
     for(Tuple t : tuples) {
@@ -210,7 +230,7 @@ public class WriterBoltTest extends BaseBoltTest{
       verify(outputCollector, times(1)).ack(t);
     }
     verify(outputCollector, times(1)).ack(goodTuple);
-    verify(batchWriter, times(1)).write(eq(sensorType), any(), any(), any());
+    verify(batchWriter, times(1)).write(eq(sensorType), any(), any());
     verify(outputCollector, times(1)).reportError(any());
     verify(outputCollector, times(0)).fail(any());
   }
@@ -219,18 +239,30 @@ public class WriterBoltTest extends BaseBoltTest{
   public void testBatchErrorWriteFailure() throws Exception {
     ParserConfigurations configurations = getConfigurations(6);
     String sensorType = "test";
+    WriterBolt bolt = spy(new WriterBolt(new WriterHandler(batchWriter), configurations, sensorType));
     List<Tuple> tuples = new ArrayList<>();
+    List<MessageId> messageIds = new ArrayList<>();
     for(int i = 0;i < 4;++i) {
       Tuple t = mock(Tuple.class);
-      when(t.getValueByField(eq("message"))).thenReturn(new JSONObject());
+      String messageId = String.format(MESSAGE_ID_FORMAT, i + 1);
+      messageIds.add(new MessageId(messageId));
+      JSONObject message = new JSONObject();
+      message.put(Constants.GUID, messageId);
+      message.put("value", String.format(MESSAGE_FORMAT, i + 1));
+      when(t.getValueByField(eq("message"))).thenReturn(message);
       tuples.add(t);
     }
     Tuple errorTuple = mock(Tuple.class);
     Tuple goodTuple = mock(Tuple.class);
-    when(goodTuple.getValueByField(eq("message"))).thenReturn(new JSONObject());
-    when(errorTuple.getValueByField(eq("message"))).thenReturn(new JSONObject());
+    JSONObject goodMessage = new JSONObject();
+    goodMessage.put(Constants.GUID, "goodMessageId");
+    goodMessage.put("value", "goodMessage");
+    JSONObject errorMessage = new JSONObject();
+    goodMessage.put(Constants.GUID, "errorMessageId");
+    errorMessage.put("value", "errorMessage");
+    when(goodTuple.getValueByField(eq("message"))).thenReturn(goodMessage);
+    when(errorTuple.getValueByField(eq("message"))).thenReturn(errorMessage);
 
-    WriterBolt bolt = new WriterBolt(new WriterHandler(batchWriter), configurations, sensorType);
     bolt.prepare(new HashMap(), topologyContext, outputCollector);
     verify(batchWriter, times(1)).init(any(), any(), any());
 
@@ -238,15 +270,15 @@ public class WriterBoltTest extends BaseBoltTest{
       Tuple t = tuples.get(i);
       bolt.execute(t);
       verify(outputCollector, times(0)).ack(t);
-      verify(batchWriter, times(0)).write(eq(sensorType), any(), any(), any());
+      verify(batchWriter, times(0)).write(eq(sensorType), any(), any());
     }
 
     // Add both the good and error Tuples. This simulates a seemingly good Tuple that fails on write.
     BulkWriterResponse writerResponse = new BulkWriterResponse();
-    writerResponse.addAllSuccesses(tuples);
-    writerResponse.addSuccess(goodTuple);
-    writerResponse.addError(new IllegalStateException(), errorTuple);
-    when(batchWriter.write(any(), any(), any(), any())).thenReturn(writerResponse);
+    writerResponse.addAllSuccesses(messageIds);
+    writerResponse.addSuccess(new MessageId("goodMessageId"));
+    writerResponse.addError(new IllegalStateException(), new MessageId("errorMessageId"));
+    when(batchWriter.write(any(), any(), any())).thenReturn(writerResponse);
     bolt.execute(errorTuple);
     for(Tuple t : tuples) {
       verify(outputCollector, times(0)).ack(t);
@@ -258,7 +290,7 @@ public class WriterBoltTest extends BaseBoltTest{
       verify(outputCollector, times(1)).ack(t);
     }
     verify(outputCollector, times(1)).ack(goodTuple);
-    verify(batchWriter, times(1)).write(eq(sensorType), any(), any(), any());
+    verify(batchWriter, times(1)).write(eq(sensorType), any(), any());
     verify(outputCollector, times(1)).reportError(any());
     verify(outputCollector, times(0)).fail(any());
   }
@@ -267,32 +299,38 @@ public class WriterBoltTest extends BaseBoltTest{
   public void testBatchErrorPathExceptionInWrite() throws Exception {
     ParserConfigurations configurations = getConfigurations(5);
     String sensorType = "test";
+    WriterBolt bolt = spy(new WriterBolt(new WriterHandler(batchWriter), configurations, sensorType));
     List<Tuple> tuples = new ArrayList<>();
+    List<String> messageIds = new ArrayList<>();
     for(int i = 0;i < 4;++i) {
       Tuple t = mock(Tuple.class);
-      when(t.getValueByField(eq("message"))).thenReturn(new JSONObject());
+      String messageId = String.format(MESSAGE_ID_FORMAT, i + 1);
+      messageIds.add(messageId);
+      JSONObject message = new JSONObject();
+      message.put("value", String.format(MESSAGE_FORMAT, i + 1));
+      when(t.getValueByField(eq("message"))).thenReturn(message);
       tuples.add(t);
     }
     Tuple goodTuple = mock(Tuple.class);
     when(goodTuple.getValueByField(eq("message"))).thenReturn(new JSONObject());
 
-    WriterBolt bolt = new WriterBolt(new WriterHandler(batchWriter), configurations, sensorType);
     bolt.prepare(new HashMap(), topologyContext, outputCollector);
-    doThrow(new Exception()).when(batchWriter).write(any(), any(), any(), any());
+    doThrow(new Exception()).when(batchWriter).write(any(), any(), any());
     verify(batchWriter, times(1)).init(any(), any(), any());
     for(int i = 0;i < 4;++i) {
       Tuple t = tuples.get(i);
       bolt.execute(t);
       verify(outputCollector, times(0)).ack(t);
-      verify(batchWriter, times(0)).write(eq(sensorType), any(), any(), any());
+      verify(batchWriter, times(0)).write(eq(sensorType), any(), any());
     }
     UnitTestHelper.setLog4jLevel(BulkWriterComponent.class, Level.FATAL);
+
     bolt.execute(goodTuple);
     UnitTestHelper.setLog4jLevel(BulkWriterComponent.class, Level.ERROR);
     for(Tuple t : tuples) {
       verify(outputCollector, times(1)).ack(t);
     }
-    verify(batchWriter, times(1)).write(eq(sensorType), any(), any(), any());
+    verify(batchWriter, times(1)).write(eq(sensorType), any(), any());
     verify(outputCollector, times(1)).ack(goodTuple);
     verify(outputCollector, times(1)).reportError(any());
     verify(outputCollector, times(0)).fail(any());
diff --git a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/integration/validation/StormParserDriver.java b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/integration/validation/StormParserDriver.java
index bfa1467..66b6c2b 100644
--- a/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/integration/validation/StormParserDriver.java
+++ b/metron-platform/metron-parsing/metron-parsing-storm/src/test/java/org/apache/metron/parsers/integration/validation/StormParserDriver.java
@@ -19,21 +19,26 @@ package org.apache.metron.parsers.integration.validation;
 
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang.SerializationUtils;
 import org.apache.metron.common.configuration.IndexingConfigurations;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.writer.BulkMessageWriter;
+import org.apache.metron.common.writer.BulkMessage;
 import org.apache.metron.common.writer.BulkWriterResponse;
-import org.apache.metron.common.writer.MessageWriter;
+import org.apache.metron.common.writer.MessageId;
 import org.apache.metron.integration.ProcessorResult;
 import org.apache.metron.parsers.bolt.ParserBolt;
 import org.apache.metron.parsers.bolt.WriterHandler;
@@ -60,10 +65,11 @@ public class StormParserDriver extends ParserDriver {
     }
 
     @Override
-    public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
-      messages.forEach(message -> output.add(message.toJSONString().getBytes()));
+    public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, List<BulkMessage<JSONObject>> messages) throws Exception {
+      messages.forEach(bulkWriterMessage -> output.add(bulkWriterMessage.getMessage().toJSONString().getBytes()));
+      Set<MessageId> ids = messages.stream().map(BulkMessage::getId).collect(Collectors.toSet());
       BulkWriterResponse bulkWriterResponse = new BulkWriterResponse();
-      bulkWriterResponse.addAllSuccesses(tuples);
+      bulkWriterResponse.addAllSuccesses(ids);
       return bulkWriterResponse;
     }
 
@@ -92,7 +98,7 @@ public class StormParserDriver extends ParserDriver {
 
     @Override
     public ParserConfigurations getConfigurations() {
-      config.getSensorParserConfig(sensorType).getParserConfig().put(IndexingConfigurations.BATCH_SIZE_CONF, 1);
+      config.getSensorParserConfig(sensorType).getParserConfig().putIfAbsent(IndexingConfigurations.BATCH_SIZE_CONF, 1);
       return config;
     }
 
@@ -114,21 +120,6 @@ public class StormParserDriver extends ParserDriver {
     }
   }
 
-//
-//  private ParserConfigurations config;
-//  private String sensorType;
-//  private ParserRunner parserRunner;
-//
-//  public ParserDriver(String sensorType, String parserConfig, String globalConfig) throws IOException {
-//    SensorParserConfig sensorParserConfig = SensorParserConfig.fromBytes(parserConfig.getBytes());
-//    this.sensorType = sensorType == null ? sensorParserConfig.getSensorTopic() : sensorType;
-//    config = new ParserConfigurations();
-//    config.updateSensorParserConfig(this.sensorType, SensorParserConfig.fromBytes(parserConfig.getBytes()));
-//    config.updateGlobalConfig(JSONUtils.INSTANCE.load(globalConfig, JSONUtils.MAP_SUPPLIER));
-//
-//    parserRunner = new ParserRunnerImpl(new HashSet<String>() {{
-//      add(sensorType);
-//    }});
   public StormParserDriver(String sensorType, String parserConfig, String globalConfig) throws IOException {
     super(sensorType, parserConfig, globalConfig);
   }
@@ -140,7 +131,9 @@ public class StormParserDriver extends ParserDriver {
     OutputCollector collector = mock(OutputCollector.class);
     bolt.prepare(null, null, collector);
     for(byte[] record : in) {
-      bolt.execute(toTuple(record));
+      Tuple tuple = toTuple(record);
+      bolt.execute(tuple);
+      verify(collector, times(1)).ack(tuple);
     }
     return bolt.getResults();
   }
diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java b/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java
deleted file mode 100644
index 153a1aa..0000000
--- a/metron-platform/metron-pcap/src/main/java/org/apache/metron/pcap/writer/PcapWriter.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.pcap.writer;
-
-import org.apache.storm.tuple.Tuple;
-import org.apache.metron.hbase.writer.HBaseWriter;
-import org.apache.metron.pcap.utils.PcapUtils;
-import org.json.simple.JSONObject;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class PcapWriter extends HBaseWriter {
-
-  private String column;
-
-  public PcapWriter(String tableName, String column) {
-    super(tableName);
-    this.column = column;
-  }
-
-  @Override
-  public byte[] getKey(Tuple tuple, JSONObject message) {
-    String key = PcapUtils.getSessionKey(message);
-    return key.getBytes();
-  }
-
-  @Override
-  public long getTimestamp(Tuple tuple, JSONObject message) {
-    return (long) message.get("ts_micro");
-  }
-
-  @Override
-  public Map<String, byte[]> getValues(Tuple tuple, JSONObject message) {
-    Map<String, byte[]> values = new HashMap<>();
-    values.put(column, tuple.getBinary(0));
-    return values;
-  }
-
-  @Override
-  public String getName() {
-    return "pcap";
-  }
-}
diff --git a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
index 0289398..6fb2b35 100644
--- a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
+++ b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/writer/SolrWriter.java
@@ -29,13 +29,18 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.writer.BulkMessageWriter;
+import org.apache.metron.common.writer.BulkMessage;
 import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.common.writer.MessageId;
 import org.apache.metron.solr.SolrConstants;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.solr.client.solrj.SolrServerException;
@@ -46,7 +51,6 @@ import org.apache.solr.client.solrj.response.UpdateResponse;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -168,10 +172,11 @@ public class SolrWriter implements BulkMessageWriter<JSONObject>, Serializable {
 
   }
 
-  public Collection<SolrInputDocument> toDocs(Iterable<JSONObject> messages) {
+  public Collection<SolrInputDocument> toDocs(Iterable<BulkMessage<JSONObject>> messages) {
     Collection<SolrInputDocument> ret = new ArrayList<>();
-    for(JSONObject message: messages) {
+    for(BulkMessage<JSONObject> bulkWriterMessage: messages) {
       SolrInputDocument document = new SolrInputDocument();
+      JSONObject message = bulkWriterMessage.getMessage();
       for (Object key : message.keySet()) {
         Object value = message.get(key);
         if (value instanceof Iterable) {
@@ -199,30 +204,31 @@ public class SolrWriter implements BulkMessageWriter<JSONObject>, Serializable {
   }
 
   @Override
-  public BulkWriterResponse write(String sourceType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
+  public BulkWriterResponse write(String sourceType, WriterConfiguration configurations, List<BulkMessage<JSONObject>> messages) throws Exception {
     String collection = getCollection(sourceType, configurations);
     BulkWriterResponse bulkResponse = new BulkWriterResponse();
     Collection<SolrInputDocument> docs = toDocs(messages);
+    Set<MessageId> ids = messages.stream().map(BulkMessage::getId).collect(Collectors.toSet());
     try {
       Optional<SolrException> exceptionOptional = fromUpdateResponse(solr.add(collection, docs));
       // Solr commits the entire batch or throws an exception for it.  There's no way to get partial failures.
       if(exceptionOptional.isPresent()) {
-        bulkResponse.addAllErrors(exceptionOptional.get(), tuples);
+        bulkResponse.addAllErrors(exceptionOptional.get(), ids);
       }
       else {
         if (shouldCommit) {
           exceptionOptional = fromUpdateResponse(solr.commit(collection, waitFlush, waitSearcher, softCommit));
           if(exceptionOptional.isPresent()) {
-            bulkResponse.addAllErrors(exceptionOptional.get(), tuples);
+            bulkResponse.addAllErrors(exceptionOptional.get(), ids);
           }
         }
         if(!exceptionOptional.isPresent()) {
-          bulkResponse.addAllSuccesses(tuples);
+          bulkResponse.addAllSuccesses(ids);
         }
       }
     }
     catch(HttpSolrClient.RemoteSolrException sse) {
-      bulkResponse.addAllErrors(sse, tuples);
+      bulkResponse.addAllErrors(sse, ids);
     }
 
     return bulkResponse;
diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java
index 1a8e290..a4908ad 100644
--- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java
+++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/schema/SchemaValidationIntegrationTest.java
@@ -21,25 +21,20 @@ import com.google.common.collect.Iterables;
 import com.google.common.io.Files;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.writer.BulkMessage;
 import org.apache.metron.common.writer.BulkWriterResponse;
 import org.apache.metron.solr.integration.components.SolrComponent;
 import org.apache.metron.solr.writer.SolrWriter;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.storm.tuple.Tuple;
-import org.apache.zookeeper.KeeperException;
 import org.json.simple.JSONObject;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mock;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.*;
 
 import static org.apache.metron.solr.SolrConstants.SOLR_ZOOKEEPER;
-import static org.mockito.Mockito.mock;
 
 public class SchemaValidationIntegrationTest {
   public static Iterable<String> getData(String sensor) throws IOException {
@@ -96,27 +91,25 @@ public class SchemaValidationIntegrationTest {
       component.addCollection(String.format("%s", sensorType), String.format("src/main/config/schema/%s", sensorType));
       Map<String, Object> globalConfig = getGlobalConfig(sensorType, component);
 
-      List<JSONObject> inputs = new ArrayList<>();
-      List<Tuple> tuples = new ArrayList<>();
+      List<BulkMessage<JSONObject>> messages = new ArrayList<>();
       Map<String, Map<String, Object>> index = new HashMap<>();
+      int i = 0;
       for (String message : getData(sensorType)) {
         if (message.trim().length() > 0) {
-          Tuple t = mock(Tuple.class);
-          tuples.add(t);
           Map<String, Object> m = JSONUtils.INSTANCE.load(message.trim(), JSONUtils.MAP_SUPPLIER);
           String guid = getGuid(m);
           index.put(guid, m);
-          inputs.add(new JSONObject(m));
+          messages.add(new BulkMessage<>(String.format("message%d", ++i), new JSONObject(m)));
         }
       }
-      Assert.assertTrue(inputs.size() > 0);
+      Assert.assertTrue(messages.size() > 0);
 
       SolrWriter solrWriter = new SolrWriter();
 
       WriterConfiguration writerConfig = new WriterConfiguration() {
         @Override
         public int getBatchSize(String sensorName) {
-          return inputs.size();
+          return messages.size();
         }
 
         @Override
@@ -143,7 +136,7 @@ public class SchemaValidationIntegrationTest {
         public Map<String, Object> getSensorConfig(String sensorName) {
           return new HashMap<String, Object>() {{
             put("index", sensorType);
-            put("batchSize", inputs.size());
+            put("batchSize", messages.size());
             put("enabled", true);
           }};
         }
@@ -166,7 +159,7 @@ public class SchemaValidationIntegrationTest {
 
       solrWriter.init(null, null, writerConfig);
 
-      BulkWriterResponse response = solrWriter.write(sensorType, writerConfig, tuples, inputs);
+      BulkWriterResponse response = solrWriter.write(sensorType, writerConfig, messages);
       Assert.assertTrue(response.getErrors().isEmpty());
       for (Map<String, Object> m : component.getAllIndexedDocs(sensorType)) {
         Map<String, Object> expected = index.get(getGuid(m));
diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
index 7b7d208..0f8dab1 100644
--- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
+++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/writer/SolrWriterTest.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.IndexingConfigurations;
 import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration;
+import org.apache.metron.common.writer.BulkMessage;
 import org.apache.metron.enrichment.integration.utils.SampleUtil;
 import org.apache.solr.client.solrj.request.QueryRequest;
 import org.apache.solr.common.SolrInputDocument;
@@ -113,9 +114,9 @@ public class SolrWriterTest {
     message2.put(Constants.SENSOR_TYPE, "test");
     message2.put("intField", 200);
     message2.put("doubleField", 200.0);
-    List<JSONObject> messages = new ArrayList<>();
-    messages.add(message1);
-    messages.add(message2);
+    List<BulkMessage<JSONObject>> messages = new ArrayList<>();
+    messages.add(new BulkMessage<>("message1", message1));
+    messages.add(new BulkMessage<>("message2", message2));
 
     String collection = "metron";
     MetronSolrClient solr = Mockito.mock(MetronSolrClient.class);
@@ -131,7 +132,7 @@ public class SolrWriterTest {
     writer.init(null, null, new IndexingWriterConfiguration("solr", configurations));
     verify(solr, times(1)).setDefaultCollection(collection);
 
-    writer.write("test", new IndexingWriterConfiguration("solr", configurations), new ArrayList<>(), messages);
+    writer.write("test", new IndexingWriterConfiguration("solr", configurations), messages);
     verify(solr, times(1)).add(eq("yaf"), argThat(new SolrInputDocumentMatcher(ImmutableList.of(message1, message2))));
     verify(solr, times(1)).commit("yaf"
                                  , (boolean)SolrWriter.SolrProperties.COMMIT_WAIT_FLUSH.defaultValue.get()
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/AckTuplesPolicy.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/AckTuplesPolicy.java
new file mode 100644
index 0000000..64685bf
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/AckTuplesPolicy.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer;
+
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.common.writer.BulkMessage;
+import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.common.writer.MessageId;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link org.apache.metron.writer.FlushPolicy} implementation for Storm that handles tuple acking and error
+ * reporting by handling flush events for writer responses.
+ */
+public class AckTuplesPolicy<MESSAGE_T> implements FlushPolicy<MESSAGE_T> {
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  // Tracks the messages from a tuple that have not been flushed
+  private Map<Tuple, Collection<MessageId>> tupleMessageMap = new HashMap<>();
+
+  // Tracks the errors that have been reported for a Tuple.  We only want to report an error once.
+  private Map<Tuple, Set<Throwable>> tupleErrorMap = new HashMap<>();
+  private OutputCollector collector;
+  private MessageGetStrategy messageGetStrategy;
+
+  public AckTuplesPolicy(OutputCollector collector, MessageGetStrategy messageGetStrategy) {
+    this.collector = collector;
+    this.messageGetStrategy = messageGetStrategy;
+  }
+
+  // Used only for unit testing
+  protected Map<Tuple, Collection<MessageId>> getTupleMessageMap() {
+    return tupleMessageMap;
+  }
+
+  // Used only for unit testing
+  protected Map<Tuple, Set<Throwable>> getTupleErrorMap() {
+    return tupleErrorMap;
+  }
+
+  @Override
+  public boolean shouldFlush(String sensorType, WriterConfiguration configurations, List<BulkMessage<MESSAGE_T>> messages) {
+    return false;
+  }
+
+  @Override
+  public void onFlush(String sensorType, BulkWriterResponse response) {
+    LOG.debug("Handling flushed messages for sensor {} with response: {}", sensorType, response);
+
+    // Update tuple message map.  Tuple is ready to ack when all it's messages have been flushed.
+    Collection<Tuple> tuplesToAck = new ArrayList<>();
+    tupleMessageMap = tupleMessageMap.entrySet().stream()
+            .map(entry -> {
+              Tuple tuple = entry.getKey();
+              Collection<MessageId> ids = new ArrayList<>(entry.getValue());
+
+              // Remove successful messages from tuple message map
+              ids.removeAll(response.getSuccesses());
+
+              // Remove failed messages from tuple message map
+              response.getErrors().forEach((throwable, failedIds) -> {
+                if (ids.removeAll(failedIds)) {
+                  // Add an error to be reported when a tuple is acked
+                  Set<Throwable> errorList = tupleErrorMap.getOrDefault(tuple, new HashSet<>());
+                  tupleErrorMap.put(tuple, errorList);
+                  errorList.add(throwable);
+                  handleError(sensorType, throwable, tuple);
+                }
+              });
+              return new AbstractMap.SimpleEntry<>(tuple, ids);
+            })
+            .filter(entry -> {
+              // Tuple is ready to be acked when all messages have succeeded/failed
+              if (entry.getValue().isEmpty()) {
+                tuplesToAck.add(entry.getKey());
+
+                // Remove the tuple from tuple message map
+                return false;
+              }
+              return true;
+            })
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+    LOG.debug("Acking {} tuples for sensor {}", tuplesToAck.size(), sensorType);
+    tuplesToAck.forEach(tuple -> {
+      collector.ack(tuple);
+    });
+
+    // Determine which tuples failed
+    Collection<Tuple> failedTuples = tuplesToAck.stream()
+            .filter(tuple -> tupleErrorMap.containsKey(tuple))
+            .collect(Collectors.toList());
+    LOG.debug("Failing {} tuple(s) for sensorType {}", failedTuples.size(), sensorType);
+
+    Set<Throwable> errorsToReport = new HashSet<>();
+    failedTuples.forEach(tuple -> {
+      // Add the error to the errorsToReport Set so duplicate errors are removed
+      errorsToReport.addAll(tupleErrorMap.remove(tuple));
+    });
+
+    errorsToReport.forEach(throwable -> {
+      // there is only one error to report for all of the failed tuples
+      collector.reportError(throwable);
+    });
+  }
+
+  /**
+   * Adds a tuple to be acked when all messages have been processed (either as a successful write or a failure).
+   * @param tuple
+   * @param messageIds
+   */
+  public void addTupleMessageIds(Tuple tuple, Collection<String> messageIds) {
+    LOG.debug("Adding tuple with messages ids: {}", String.join(",", messageIds));
+    tupleMessageMap.put(tuple, messageIds.stream().map(MessageId::new).collect(Collectors.toSet()));
+  }
+
+  private void handleError(String sensorType, Throwable e, Tuple tuple) {
+    MetronError error = new MetronError()
+            .withSensorType(Collections.singleton(sensorType))
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR)
+            .withThrowable(e)
+            .addRawMessage(messageGetStrategy.get(tuple));
+    collector.emit(Constants.ERROR_STREAM, new Values(error.getJSONObject()));
+  }
+}
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BatchSizePolicy.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BatchSizePolicy.java
new file mode 100644
index 0000000..6153b4c
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BatchSizePolicy.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer;
+
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.writer.BulkMessage;
+import org.apache.metron.common.writer.BulkWriterResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+
+public class BatchSizePolicy<MESSAGE_T> implements FlushPolicy<MESSAGE_T> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * Flushes a batch whenever the number of messages to be written is greater than or equal to configured batch size of
+   * the sensor type.
+   * @param sensorType sensor type
+   * @param configurations writer configurations
+   * @param messages messages to be written
+   * @return true if message batch size is greater than configured batch size.
+   */
+  @Override
+  public boolean shouldFlush(String sensorType, WriterConfiguration configurations, List<BulkMessage<MESSAGE_T>> messages) {
+    boolean shouldFlush = false;
+    int batchSize = messages.size();
+    int configuredBatchSize = configurations.getBatchSize(sensorType);
+    //Check for batchSize flush
+    if (batchSize >= configuredBatchSize) {
+      LOG.debug("Batch size of {} reached. Flushing {} messages for sensor {}.", configuredBatchSize, batchSize, sensorType);
+      shouldFlush = true;
+    }
+    return shouldFlush;
+  }
+
+  @Override
+  public void onFlush(String sensorType, BulkWriterResponse response) {
+
+  }
+}
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BatchTimeoutPolicy.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BatchTimeoutPolicy.java
new file mode 100644
index 0000000..8edd8ff
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BatchTimeoutPolicy.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer;
+
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.system.Clock;
+import org.apache.metron.common.writer.BulkMessage;
+import org.apache.metron.common.writer.BulkWriterResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class BatchTimeoutPolicy<MESSAGE_T> implements FlushPolicy<MESSAGE_T> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private int maxBatchTimeout;
+  private Clock clock = new Clock();
+  private Map<String, Long> timeouts = new HashMap<>();
+
+  public BatchTimeoutPolicy(int maxBatchTimeout) {
+    if (maxBatchTimeout <= 0) {
+      throw new IllegalArgumentException(String.format("The maxBatchTimeout setting is %d but must be greater than 0.", maxBatchTimeout));
+    }
+    LOG.debug("Setting maxBatchTimeout to {}.", maxBatchTimeout);
+    this.maxBatchTimeout = maxBatchTimeout;
+  }
+
+
+  public BatchTimeoutPolicy(int maxBatchTimeout, Clock clock) {
+    this(maxBatchTimeout);
+    this.clock = clock;
+  }
+
+  /**
+   * Manages timeouts for each sensor type and determines when a batch should be flushed.  At the start of a new
+   * batch the timeout value is computed based on the current time and configured timeout value.  Subsequent calls check to
+   * see if the timeout has been reached and flushes if so.  A reset clears the timeout value for that sensor type.
+   * @param sensorType sensor type
+   * @param configurations writer configurations includes timeouts
+   * @param messages messages to be written (not used here)
+   * @return true if the timeout has been reached
+   */
+  @Override
+  public boolean shouldFlush(String sensorType, WriterConfiguration configurations, List<BulkMessage<MESSAGE_T>> messages) {
+    boolean shouldFlush = false;
+    long currentTimeMillis = clock.currentTimeMillis();
+    if (!timeouts.containsKey(sensorType)) {  // no timeout present so assume this is a new batch
+      //This block executes at the beginning of every batch, per sensor.
+      //configurations can change, so (re)init getBatchTimeout(sensorType) at start of every batch
+      long batchTimeoutMs = getBatchTimeout(sensorType, configurations);
+      LOG.debug("Setting batch timeout to {} for sensor {}.", batchTimeoutMs, sensorType);
+      timeouts.put(sensorType, currentTimeMillis + batchTimeoutMs);
+    }
+    if (timeouts.get(sensorType) <= currentTimeMillis) {
+      LOG.debug("Batch timeout of {} reached. Flushing {} messages for sensor {}.",
+              timeouts.get(sensorType), messages.size(), sensorType);
+      shouldFlush = true;
+    }
+    return shouldFlush;
+  }
+
+  /**
+   * Removes the timeout value for a sensor type.  The next call to {@link org.apache.metron.writer.BatchTimeoutPolicy#shouldFlush(String, WriterConfiguration, List)}
+   * will set a new timeout.
+   * @param sensorType
+   */
+  @Override
+  public void onFlush(String sensorType, BulkWriterResponse response) {
+    timeouts.remove(sensorType);
+  }
+
+  /**
+   * Returns the configured timeout for a sensor type in milliseconds.  The max timeout will be used if the configured timeout is
+   * set to 0 or greater than the max timeout.
+   * @param sensorType
+   * @param configurations
+   * @return
+   */
+  protected long getBatchTimeout(String sensorType, WriterConfiguration configurations) {
+    int batchTimeoutSecs = configurations.getBatchTimeout(sensorType);
+    if (batchTimeoutSecs <= 0 || batchTimeoutSecs > maxBatchTimeout) {
+      batchTimeoutSecs = maxBatchTimeout;
+    }
+    return TimeUnit.SECONDS.toMillis(batchTimeoutSecs);
+  }
+
+}
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
index ad6d4d1..f4a5c74 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java
@@ -18,330 +18,181 @@
 
 package org.apache.metron.writer;
 
-import static java.lang.String.format;
-
-import com.google.common.collect.Iterables;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.system.Clock;
-import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.common.writer.BulkMessageWriter;
+import org.apache.metron.common.writer.BulkMessage;
 import org.apache.metron.common.writer.BulkWriterResponse;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
+import org.apache.metron.common.writer.MessageId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 /**
- * This component implements message batching, with both flush on queue size, and flush on queue timeout.
- * There is a queue for each sensorType.
- * Ideally each queue would have its own timer, but we only have one global timer, the Tick Tuple
- * generated at fixed intervals by the system and received by the Bolt.  Given this constraint,
- * we use the following strategy:
- *   - The default batchTimeout is, as recommended by Storm, 1/2 the Storm 'topology.message.timeout.secs',
- *   modified by batchTimeoutDivisor, in case multiple batching writers are daisy-chained in one topology.
- *   - If some sensors configure their own batchTimeouts, they are compared with the default.  Batch
- *   timeouts greater than the default will be ignored, because they can cause message recycling in Storm.
- *   Batch timeouts configured to {@literal <}= zero, or undefined, mean use the default.
- *   - The *smallest* configured batchTimeout among all sensor types, greater than zero and less than
- *   the default, will be used to configure the 'topology.tick.tuple.freq.secs' for the Bolt.  If there are no
- *   valid configured batchTimeouts, the defaultBatchTimeout will be used.
- *   - The age of the queue is checked every time a sensor message arrives.  Thus, if at least one message
- *   per second is received for a given sensor, that queue will flush on timeout or sooner, depending on batchSize.
- *   - On each Tick Tuple received, *all* queues will be checked, and if any are older than their respective
- *   batchTimeout, they will be flushed.  Note that this does NOT guarantee timely flushing, depending on the
- *   phase relationship between the queue's batchTimeout and the tick interval.  The maximum age of a queue
- *   before it flushes is its batchTimeout + the tick interval, which is guaranteed to be less than 2x the
- *   batchTimeout, and also less than the 'topology.message.timeout.secs'.  This guarantees that the messages
- *   will not age out of the Storm topology, but it does not guarantee the flush interval requested, for
- *   sensor types not receiving at least one message every second.
+ * This component manages an internal cache of messages to be written in batch.  A separate cache is used for each sensor.
+ * Each time a message is written to this component, the {@link org.apache.metron.writer.FlushPolicy#shouldFlush(String, WriterConfiguration, List)}
+ * method is called for each flush policy to determine if a batch of messages should be flushed.  When a flush does happen,
+ * the {@link org.apache.metron.writer.FlushPolicy#onFlush(String, BulkWriterResponse)} method is called for each flush policy
+ * so that any post-processing (message acknowledgement for example) can be done.  This component also ensures all messages
+ * in a batch are included in the response as either a success or failure.
  *
  * @param <MESSAGE_T>
  */
 public class BulkWriterComponent<MESSAGE_T> {
-  public static final Logger LOG = LoggerFactory
-            .getLogger(BulkWriterComponent.class);
-  private Map<String, Collection<Tuple>> sensorTupleMap = new HashMap<>();
-  private Map<String, List<MESSAGE_T>> sensorMessageMap = new HashMap<>();
-  private Map<String, long[]> batchTimeoutMap = new HashMap<>();
-  private OutputCollector collector;
-  //In test scenarios, defaultBatchTimeout may not be correctly initialized, so do it here.
-  //This is a conservative defaultBatchTimeout for a vanilla bolt with batchTimeoutDivisor=2
-  public static final int UNINITIALIZED_DEFAULT_BATCH_TIMEOUT = 6;
-  private int defaultBatchTimeout = UNINITIALIZED_DEFAULT_BATCH_TIMEOUT;
-  private boolean handleCommit = true;
-  private boolean handleError = true;
-  private static final int LAST_CREATE_TIME_MS = 0; //index zero'th element of long[] in batchTimeoutMap
-  private static final int TIMEOUT_MS = 1;          //index next element of long[] in batchTimeoutMap
-  private Clock clock = new Clock();
-
-  public BulkWriterComponent(OutputCollector collector) {
-    this.collector = collector;
-  }
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private Map<String, List<BulkMessage<MESSAGE_T>>> sensorMessageCache = new HashMap<>();
+  private List<FlushPolicy<MESSAGE_T>> flushPolicies;
 
-  public BulkWriterComponent(OutputCollector collector, boolean handleCommit, boolean handleError) {
-    this(collector);
-    this.handleCommit = handleCommit;
-    this.handleError = handleError;
+  public BulkWriterComponent(int maxBatchTimeout) {
+    flushPolicies = new ArrayList<>();
+    flushPolicies.add(new BatchSizePolicy<>());
+    flushPolicies.add(new BatchTimeoutPolicy<>(maxBatchTimeout));
   }
 
-  /**
-   * Used only for testing.  Overrides the default (actual) wall clock.
-   * @return this mutated BulkWriterComponent
-   */
-   public BulkWriterComponent withClock(Clock clock) {
-    this.clock = clock;
-    return this;
-  }
-
-  public void commit(Iterable<Tuple> tuples) {
-    tuples.forEach(t -> collector.ack(t));
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Acking {} tuples", Iterables.size(tuples));
-    }
+  public BulkWriterComponent(int maxBatchTimeout, Clock clock) {
+    flushPolicies = new ArrayList<>();
+    flushPolicies.add(new BatchSizePolicy<>());
+    flushPolicies.add(new BatchTimeoutPolicy<>(maxBatchTimeout, clock));
   }
 
-  public void commit(BulkWriterResponse response) {
-    commit(response.getSuccesses());
-  }
-
-  public void error(String sensorType, Throwable e, Iterable<Tuple> tuples, MessageGetStrategy messageGetStrategy) {
-    LOG.error(format("Failing %d tuple(s); sensorType=%s", Iterables.size(tuples), sensorType), e);
-    tuples.forEach(t -> {
-      MetronError error = new MetronError()
-              .withSensorType(Collections.singleton(sensorType))
-              .withErrorType(Constants.ErrorType.INDEXING_ERROR)
-              .withThrowable(e)
-              .addRawMessage(messageGetStrategy.get(t));
-      collector.emit(Constants.ERROR_STREAM, new Values(error.getJSONObject()));
-    });
-    if (handleCommit) {
-      commit(tuples);
-    }
-    // there is only one error to report for all of the failed tuples
-    collector.reportError(e);
-
+  protected BulkWriterComponent(List<FlushPolicy<MESSAGE_T>> flushPolicies) {
+    this.flushPolicies = flushPolicies;
   }
 
   /**
-   * Error a set of tuples that may not contain a valid message.
-   *
-   * <p>Without a valid message, the source type is unknown.
-   * <p>Without a valid message, the JSON message cannot be added to the error.
-   *
-   * @param e The exception that occurred.
-   * @param tuple The tuple to error that may not contain a valid message.
+   * Accepts a message to be written and stores it in an internal cache of messages.  Iterates through {@link org.apache.metron.writer.FlushPolicy}
+   * implementations to determine if a batch should be flushed.
+   * @param sensorType sensor type
+   * @param bulkWriterMessage message to be written
+   * @param bulkMessageWriter writer that will do the actual writing
+   * @param configurations writer configurations
    */
-  public void error(Throwable e, Tuple tuple) {
-    LOG.error("Failing tuple", e);
-    MetronError error = new MetronError()
-            .withErrorType(Constants.ErrorType.INDEXING_ERROR)
-            .withThrowable(e);
-    handleError(tuple, error);
-  }
-
-  /**
-   * Errors a set of tuples.
-   *
-   * @param tuple The tuple to error.
-   * @param error
-   */
-  private void handleError(Tuple tuple, MetronError error) {
-    collector.ack(tuple);
-    ErrorUtils.handleError(collector, error);
-  }
-
-  public void error(String sensorType, BulkWriterResponse errors, MessageGetStrategy messageGetStrategy) {
-    Map<Throwable, Collection<Tuple>> errorMap = errors.getErrors();
-    for(Map.Entry<Throwable, Collection<Tuple>> entry : errorMap.entrySet()) {
-      error(sensorType, entry.getKey(), entry.getValue(), messageGetStrategy);
-    }
-  }
-
-  protected Collection<Tuple> createTupleCollection() {
-    return new ArrayList<>();
-  }
-
-  public void errorAll(Throwable e, MessageGetStrategy messageGetStrategy) {
-    for(String key : new HashSet<>(sensorTupleMap.keySet())) {
-      errorAll(key, e, messageGetStrategy);
-    }
-  }
-
-  public void errorAll(String sensorType, Throwable e, MessageGetStrategy messageGetStrategy) {
-    Collection<Tuple> tuples = Optional.ofNullable(sensorTupleMap.get(sensorType)).orElse(new ArrayList<>());
-    error(sensorType, e, tuples, messageGetStrategy);
-    sensorTupleMap.remove(sensorType);
-    sensorMessageMap.remove(sensorType);
-  }
-
-  public void write( String sensorType
-                   , Tuple tuple
-                   , MESSAGE_T message
-                   , BulkMessageWriter<MESSAGE_T> bulkMessageWriter
-                   , WriterConfiguration configurations
-                   , MessageGetStrategy messageGetStrategy
-                   ) throws Exception
+  public void write(String sensorType
+          , BulkMessage<MESSAGE_T> bulkWriterMessage
+          , BulkMessageWriter<MESSAGE_T> bulkMessageWriter
+          , WriterConfiguration configurations
+  )
   {
-    if (!configurations.isEnabled(sensorType)) {
-      collector.ack(tuple);
-      return;
-    }
-    int batchSize = configurations.getBatchSize(sensorType);
-
-    if (batchSize <= 1) { //simple case - no batching, no timeouts
-      Collection<Tuple> tupleList = sensorTupleMap.get(sensorType);  //still read in case batchSize changed
-      if (tupleList == null) {
-        tupleList = createTupleCollection();
-      }
-      tupleList.add(tuple);
-
-      List<MESSAGE_T> messageList = sensorMessageMap.get(sensorType);  //still read in case batchSize changed
-      if (messageList == null) {
-        messageList = new ArrayList<>();
-      }
-      messageList.add(message);
+    List<BulkMessage<MESSAGE_T>> messages = sensorMessageCache.getOrDefault(sensorType, new ArrayList<>());
+    sensorMessageCache.put(sensorType, messages);
 
-      flush(sensorType, bulkMessageWriter, configurations, messageGetStrategy, tupleList, messageList);
-      return;
-    }
-
-    //Otherwise do the full batch buffering with timeouts
-    long[] batchTimeoutInfo = batchTimeoutMap.get(sensorType);
-    if (batchTimeoutInfo == null) {
-      //lazily create the batchTimeoutInfo array, once per sensor.
-      batchTimeoutInfo = new long[] {0L, 0L};
-      batchTimeoutMap.put(sensorType, batchTimeoutInfo);
-    }
-
-    Collection<Tuple> tupleList = sensorTupleMap.get(sensorType);
-    if (tupleList == null) {
-      //This block executes at the beginning of every batch, per sensor.
-      tupleList = createTupleCollection();
-      sensorTupleMap.put(sensorType, tupleList);
-      batchTimeoutInfo[LAST_CREATE_TIME_MS] = clock.currentTimeMillis();
-      //configurations can change, so (re)init getBatchTimeout(sensorType) at start of every batch
-      int batchTimeoutSecs = configurations.getBatchTimeout(sensorType);
-      if (batchTimeoutSecs <= 0 || batchTimeoutSecs > defaultBatchTimeout) {
-        batchTimeoutSecs = defaultBatchTimeout;
-      }
-      batchTimeoutInfo[TIMEOUT_MS] = TimeUnit.SECONDS.toMillis(batchTimeoutSecs);
-    }
-    tupleList.add(tuple);
-
-    List<MESSAGE_T> messageList = sensorMessageMap.get(sensorType);
-    if (messageList == null) {
-      messageList = new ArrayList<>();
-      sensorMessageMap.put(sensorType, messageList);
-    }
-    messageList.add(message);
-
-    //Check for batchSize flush
-    if (tupleList.size() >= batchSize) {
-      flush(sensorType, bulkMessageWriter, configurations, messageGetStrategy, tupleList, messageList);
-      return;
-    }
-    //Check for batchTimeout flush (if the tupleList isn't brand new).
-    //Debugging note: If your queue always flushes at length==2 regardless of feed rate,
-    //it may mean defaultBatchTimeout has somehow been set to zero.
-    if (tupleList.size() > 1 && (clock.currentTimeMillis() - batchTimeoutInfo[LAST_CREATE_TIME_MS] >= batchTimeoutInfo[TIMEOUT_MS])) {
-      flush(sensorType, bulkMessageWriter, configurations, messageGetStrategy, tupleList, messageList);
-      return;
+    // if a sensor type is disabled flush all pending messages and discard the new message
+    if (!configurations.isEnabled(sensorType)) {
+      // flush pending messages
+      flush(sensorType, bulkMessageWriter, configurations, messages);
+
+      // Include the new message for any post-processing but don't write it
+      BulkWriterResponse response = new BulkWriterResponse();
+      response.addSuccess(bulkWriterMessage.getId());
+      onFlush(sensorType, response);
+    } else {
+      messages.add(bulkWriterMessage);
+      applyShouldFlush(sensorType, bulkMessageWriter, configurations, sensorMessageCache.get(sensorType));
     }
   }
 
+  /**
+   * Flushes a batch for a sensor type by writing messages with the supplied {@link org.apache.metron.common.writer.BulkMessageWriter}.
+   * Ensures all message ids in a batch are included in the response. After messages are written the cache is cleared and
+   * flush policies are reset for that sensor type.
+   * @param sensorType sensor type
+   * @param bulkMessageWriter writer that will do the actual writing
+   * @param configurations writer configurations
+   * @param messages messages to be written
+   */
   protected void flush( String sensorType
                     , BulkMessageWriter<MESSAGE_T> bulkMessageWriter
                     , WriterConfiguration configurations
-		                , MessageGetStrategy messageGetStrategy
-                    , Collection<Tuple> tupleList
-                    , List<MESSAGE_T> messageList
-                    ) throws Exception
+                    , List<BulkMessage<MESSAGE_T>> messages
+                    )
   {
     long startTime = System.currentTimeMillis(); //no need to mock, so use real time
-    try {
-      BulkWriterResponse response = bulkMessageWriter.write(sensorType, configurations, tupleList, messageList);
-
-      // Commit or error piecemeal.
-      if(handleCommit) {
-        commit(response);
-      }
-
-      if(handleError) {
-        error(sensorType, response, messageGetStrategy);
-      } else if (response.hasErrors()) {
-        throw new IllegalStateException("Unhandled bulk errors in response: " + response.getErrors());
-      }
+    BulkWriterResponse response = new BulkWriterResponse();
 
-      // Make sure all tuples are acked by acking any tuples not returned in the BulkWriterResponse
-      if (handleCommit) {
-        Set<Tuple> tuplesToAck = new HashSet<>(tupleList);
-        tuplesToAck.removeAll(response.getSuccesses());
-        response.getErrors().values().forEach(tuplesToAck::removeAll);
-        commit(tuplesToAck);
-      }
+    Collection<MessageId> ids = messages.stream().map(BulkMessage::getId).collect(Collectors.toList());
+    try {
+      response = bulkMessageWriter.write(sensorType, configurations, messages);
 
+      // Make sure all ids are included in the BulkWriterResponse
+      ids.removeAll(response.getSuccesses());
+      response.getErrors().values().forEach(ids::removeAll);
+      response.addAllSuccesses(ids);
     } catch (Throwable e) {
-      if(handleError) {
-        error(sensorType, e, tupleList, messageGetStrategy);
-      }
-      else {
-        throw e;
-      }
-    }
-    finally {
-      sensorTupleMap.remove(sensorType);
-      sensorMessageMap.remove(sensorType);
+      response.addAllErrors(e, ids);
+    } finally {
+      onFlush(sensorType, response);
     }
     long endTime = System.currentTimeMillis();
     long elapsed = endTime - startTime;
-    LOG.debug("Flushed batch successfully; sensorType={}, batchSize={}, took={} ms", sensorType, CollectionUtils.size(tupleList), elapsed);
+    LOG.debug("Flushed batch successfully; sensorType={}, batchSize={}, took={} ms", sensorType, CollectionUtils.size(ids), elapsed);
   }
 
-  // Flushes all queues older than their batchTimeouts.
-  public void flushTimeouts(
+  /**
+   * Apply flush policies to all sensors and flush if necessary.
+   * @param bulkMessageWriter writer that will do the actual writing
+   * @param configurations writer configurations
+   */
+  public void flushAll(
             BulkMessageWriter<MESSAGE_T> bulkMessageWriter
           , WriterConfiguration configurations
-          , MessageGetStrategy messageGetStrategy
-          ) throws Exception
+          )
   {
-    // No need to do "all" sensorTypes here, just the ones that have data batched up.
-    // Note queues with batchSize == 1 don't get batched, so they never persist in the sensorTupleMap.
     // Sensors are removed from the sensorTupleMap when flushed so we need to iterate over a copy of sensorTupleMap keys
     // to avoid a ConcurrentModificationException.
-    for (String sensorType : new HashSet<>(sensorTupleMap.keySet())) {
-      long[] batchTimeoutInfo = batchTimeoutMap.get(sensorType);
-      if (batchTimeoutInfo == null  //Shouldn't happen, but conservatively flush if so
-          || clock.currentTimeMillis() - batchTimeoutInfo[LAST_CREATE_TIME_MS] >= batchTimeoutInfo[TIMEOUT_MS]) {
-        flush(sensorType, bulkMessageWriter, configurations, messageGetStrategy
-	            , sensorTupleMap.get(sensorType), sensorMessageMap.get(sensorType));
-      }
+    for (String sensorType : new HashSet<>(sensorMessageCache.keySet())) {
+      applyShouldFlush(sensorType, bulkMessageWriter, configurations, sensorMessageCache.get(sensorType));
     }
   }
 
-  public int getDefaultBatchTimeout() {
-    return defaultBatchTimeout;
+  /**
+   * Add a custom flush policy in addition to the default policies.
+   * @param flushPolicy flush policy
+   */
+  public void addFlushPolicy(FlushPolicy flushPolicy) {
+    this.flushPolicies.add(flushPolicy);
+  }
+
+  /**
+   * Checks each flush policy to determine if a batch should be flushed.  A batch is flushed and the remaining policies
+   * are skipped when a policy returns true.
+   * @param sensorType sensor type
+   * @param bulkMessageWriter writer that will do the actual writing
+   * @param configurations writer configurations
+   * @param messages messages to be written
+   */
+  private void applyShouldFlush(String sensorType
+          , BulkMessageWriter<MESSAGE_T> bulkMessageWriter
+          , WriterConfiguration configurations
+          , List<BulkMessage<MESSAGE_T>> messages) {
+    if (messages.size() > 0) { // no need to flush empty batches
+      for(FlushPolicy<MESSAGE_T> flushPolicy: flushPolicies) {
+        if (flushPolicy.shouldFlush(sensorType, configurations, messages)) {
+          flush(sensorType, bulkMessageWriter, configurations, messages);
+          break;
+        }
+      }
+    }
   }
 
   /**
-   * @param defaultBatchTimeout
+   * Called after a batch is flushed.  The message cache is cleared and the {@link org.apache.metron.writer.FlushPolicy#onFlush(String, BulkWriterResponse)}
+   * method is called for each flush policy.
+   * @param sensorType sensor type
+   * @param response response from a bulk write call
    */
-  public void setDefaultBatchTimeout(int defaultBatchTimeout) {
-    this.defaultBatchTimeout = defaultBatchTimeout;
+  private void onFlush(String sensorType, BulkWriterResponse response) {
+    sensorMessageCache.remove(sensorType);
+    for(FlushPolicy flushPolicy: flushPolicies) {
+      flushPolicy.onFlush(sensorType, response);
+    }
   }
 }
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/FlushPolicy.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/FlushPolicy.java
new file mode 100644
index 0000000..dae9a30
--- /dev/null
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/FlushPolicy.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer;
+
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.writer.BulkMessageWriter;
+import org.apache.metron.common.writer.BulkMessage;
+import org.apache.metron.common.writer.BulkWriterResponse;
+
+import java.util.List;
+
+/**
+ * This interface is used by the {@link org.apache.metron.writer.BulkWriterComponent} to determine if a batch should be flushed
+ * and handle the {@link org.apache.metron.common.writer.BulkWriterResponse} when a batch is flushed.
+ */
+public interface FlushPolicy<MESSAGE_T> {
+
+  /**
+   * This method is called whenever messages are passed to {@link BulkWriterComponent#write(String, BulkMessage, BulkMessageWriter, WriterConfiguration)}.
+   * Each implementation of {@link org.apache.metron.writer.FlushPolicy#shouldFlush(String, WriterConfiguration, List)} will be called in order
+   * and the first one to return true will trigger a flush and continue on.
+   * @param sensorType sensor type
+   * @param configurations configurations
+   * @param messages messages to be written
+   * @return true if batch should be flushed
+   */
+  boolean shouldFlush(String sensorType, WriterConfiguration configurations, List<BulkMessage<MESSAGE_T>> messages);
+
+  /**
+   * This method is called after a flush happens.  It can be used to clear any internal state a {@link org.apache.metron.writer.FlushPolicy}
+   * maintains to determine if a batch should be flushed.  This method is called for all {@link org.apache.metron.writer.FlushPolicy}
+   * implementations after a batch is flushed with {@link org.apache.metron.writer.BulkWriterComponent#flush(String, BulkMessageWriter, WriterConfiguration, List)}.
+   * @param sensorType sensor type
+   */
+  void onFlush(String sensorType, BulkWriterResponse response);
+}
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
index 0e73a3b..01def5d 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/NoopWriter.java
@@ -17,8 +17,9 @@
  */
 package org.apache.metron.writer;
 
+import org.apache.metron.common.writer.BulkMessage;
+import org.apache.metron.common.writer.MessageId;
 import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
@@ -29,8 +30,10 @@ import org.json.simple.JSONObject;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 public class NoopWriter extends AbstractWriter implements BulkMessageWriter<JSONObject> {
 
@@ -131,13 +134,13 @@ public class NoopWriter extends AbstractWriter implements BulkMessageWriter<JSON
   }
 
   @Override
-  public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
+  public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, List<BulkMessage<JSONObject>> messages) throws Exception {
     if(sleepFunction != null) {
       sleepFunction.apply(null);
     }
-
+    Set<MessageId> ids = messages.stream().map(BulkMessage::getId).collect(Collectors.toSet());
     BulkWriterResponse response = new BulkWriterResponse();
-    response.addAllSuccesses(tuples);
+    response.addAllSuccesses(ids);
     return response;
   }
 
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
index 7d7eae5..709875a 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/WriterToBulkWriter.java
@@ -18,8 +18,9 @@
 
 package org.apache.metron.writer;
 
+import org.apache.metron.common.writer.BulkMessage;
+import org.apache.metron.common.writer.MessageId;
 import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
 import com.google.common.collect.Iterables;
 import org.apache.metron.common.configuration.writer.SingleBatchConfigurationFacade;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
@@ -30,7 +31,9 @@ import org.apache.metron.common.writer.BulkWriterResponse;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 public class WriterToBulkWriter<MESSAGE_T> implements BulkMessageWriter<MESSAGE_T>, Serializable {
   MessageWriter<MESSAGE_T> messageWriter;
@@ -46,21 +49,22 @@ public class WriterToBulkWriter<MESSAGE_T> implements BulkMessageWriter<MESSAGE_
   }
 
   @Override
-  public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<MESSAGE_T> messages) throws Exception {
+  public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, List<BulkMessage<MESSAGE_T>> messages) throws Exception {
+    Set<MessageId> ids = messages.stream().map(BulkMessage::getId).collect(Collectors.toSet());
     BulkWriterResponse response = new BulkWriterResponse();
     if(messages.size() > 1) {
-        response.addAllErrors(new IllegalStateException("WriterToBulkWriter expects a batch of exactly 1"), tuples);
+        response.addAllErrors(new IllegalStateException("WriterToBulkWriter expects a batch of exactly 1"), ids);
         return response;
     }
 
     try {
-      messageWriter.write(sensorType, configurations, Iterables.getFirst(tuples, null), Iterables.getFirst(messages, null));
+      messageWriter.write(sensorType, configurations, Iterables.getFirst(messages, null));
     } catch(Exception e) {
-      response.addAllErrors(e, tuples);
+      response.addAllErrors(e, ids);
       return response;
     }
 
-    response.addAllSuccesses(tuples);
+    response.addAllSuccesses(ids);
     return response;
   }
 
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java
index 195e010..52e1b90 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BatchTimeoutHelper.java
@@ -29,7 +29,7 @@ import java.util.function.Supplier;
 /**
  * Routines to help figure out the effective batchTimeout(s), using information from
  * multiple configuration sources, topology.message.timeout.secs, and batchTimeoutDivisor,
- * and use it to calculate defaultBatchTimeout and appropriate topology.tick.tuple.freq.secs.
+ * and use it to calculate maxBatchTimeout and appropriate topology.tick.tuple.freq.secs.
  *
  * These methods cause no side effects outside of setting the internal member variables.
  * "base" config are from defaults and storm.yaml (subordinate to Component config settings)
@@ -108,7 +108,7 @@ public class BatchTimeoutHelper {
   }
 
   private void calcMaxBatchTimeoutAllowed() {
-    // The max batchTimeout allowed also becomes the default batchTimeout.
+    // The max batchTimeout allowed also becomes the max batchTimeout.
     effectiveMessageTimeoutSecs = (cliMessageTimeoutSecs == 0 ? baseMessageTimeoutSecs : cliMessageTimeoutSecs);
     if (effectiveMessageTimeoutSecs == 0) {
       LOG.info("topology.message.timeout.secs is disabled in both Storm config and CLI.  Allowing unlimited batchTimeouts.");
@@ -130,7 +130,7 @@ public class BatchTimeoutHelper {
    * @return the max batchTimeout allowed, in seconds
    * Guaranteed positive number.
    */
-  public int getDefaultBatchTimeout() {
+  public int getMaxBatchTimeout() {
     if (!initialized) {this.init();}
     return maxBatchTimeoutAllowedSecs;
   }
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
index 590ab8c..c9215e3 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java
@@ -17,22 +17,31 @@
  */
 package org.apache.metron.writer.bolt;
 
+import static java.lang.String.format;
 import static org.apache.storm.utils.TupleUtils.isTick;
 
 import com.google.common.collect.ImmutableList;
+
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.function.Function;
+
+import com.google.common.collect.Iterables;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.bolt.ConfiguredBolt;
 import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.error.MetronError;
 import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.message.MessageGetters;
 import org.apache.metron.common.system.Clock;
+import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.common.utils.MessageUtils;
 import org.apache.metron.common.writer.BulkMessageWriter;
+import org.apache.metron.common.writer.BulkMessage;
 import org.apache.metron.common.writer.MessageWriter;
+import org.apache.metron.writer.AckTuplesPolicy;
 import org.apache.metron.writer.BulkWriterComponent;
 import org.apache.metron.writer.WriterToBulkWriter;
 import org.apache.storm.Config;
@@ -41,10 +50,37 @@ import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * This bolt implements message batching and writing, with both flush on queue size, and flush on queue timeout.
+ * There is a queue for each sensorType.
+ * Ideally each queue would have its own timer, but we only have one global timer, the Tick Tuple
+ * generated at fixed intervals by the system and received by the Bolt.  Given this constraint,
+ * we use the following strategy:
+ *   - The default batchTimeout is, as recommended by Storm, 1/2 the Storm 'topology.message.timeout.secs',
+ *   modified by batchTimeoutDivisor, in case multiple batching writers are daisy-chained in one topology.
+ *   - If some sensors configure their own batchTimeouts, they are compared with the default.  Batch
+ *   timeouts greater than the default will be ignored, because they can cause message recycling in Storm.
+ *   Batch timeouts configured to {@literal <}= zero, or undefined, mean use the default.
+ *   - The *smallest* configured batchTimeout among all sensor types, greater than zero and less than
+ *   the default, will be used to configure the 'topology.tick.tuple.freq.secs' for the Bolt.  If there are no
+ *   valid configured batchTimeouts, the maxBatchTimeout will be used.
+ *   - The age of the queue is checked every time a sensor message arrives.  Thus, if at least one message
+ *   per second is received for a given sensor, that queue will flush on timeout or sooner, depending on batchSize.
+ *   - On each Tick Tuple received, *all* queues will be checked, and if any are older than their respective
+ *   batchTimeout, they will be flushed.  Note that this does NOT guarantee timely flushing, depending on the
+ *   phase relationship between the queue's batchTimeout and the tick interval.  The maximum age of a queue
+ *   before it flushes is its batchTimeout + the tick interval, which is guaranteed to be less than 2x the
+ *   batchTimeout, and also less than the 'topology.message.timeout.secs'.  This guarantees that the messages
+ *   will not age out of the Storm topology, but it does not guarantee the flush interval requested, for
+ *   sensor types not receiving at least one message every second.
+ *
+ * @param <CONFIG_T>
+ */
 public class BulkMessageWriterBolt<CONFIG_T extends Configurations> extends ConfiguredBolt<CONFIG_T> {
 
   private static final Logger LOG = LoggerFactory
@@ -57,8 +93,9 @@ public class BulkMessageWriterBolt<CONFIG_T extends Configurations> extends Conf
   private transient OutputCollector collector;
   private transient Function<WriterConfiguration, WriterConfiguration> configurationTransformation = null;
   private int requestedTickFreqSecs;
-  private int defaultBatchTimeout;
+  private int maxBatchTimeout;
   private int batchTimeoutDivisor = 1;
+  private transient AckTuplesPolicy ackTuplesPolicy = null;
 
   public BulkMessageWriterBolt(String zookeeperUrl, String configurationStrategy) {
     super(zookeeperUrl, configurationStrategy);
@@ -112,17 +149,17 @@ public class BulkMessageWriterBolt<CONFIG_T extends Configurations> extends Conf
 
   /**
    * Used only for unit testing
-   * @param defaultBatchTimeout
+   * @param maxBatchTimeout
    */
-  protected void setDefaultBatchTimeout(int defaultBatchTimeout) {
-    this.defaultBatchTimeout = defaultBatchTimeout;
+  protected void setMaxBatchTimeout(int maxBatchTimeout) {
+    this.maxBatchTimeout = maxBatchTimeout;
   }
 
   /**
    * Used only for unit testing
    */
-  public int getDefaultBatchTimeout() {
-    return defaultBatchTimeout;
+  public int getMaxBatchTimeout() {
+    return maxBatchTimeout;
   }
 
   public BulkWriterComponent<JSONObject> getWriterComponent() {
@@ -156,8 +193,8 @@ public class BulkMessageWriterBolt<CONFIG_T extends Configurations> extends Conf
 
     BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(writerconf::getAllConfiguredTimeouts, batchTimeoutDivisor);
     this.requestedTickFreqSecs = timeoutHelper.getRecommendedTickInterval();
-    //And while we've got BatchTimeoutHelper handy, capture the defaultBatchTimeout for writerComponent.
-    this.defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout();
+    //And while we've got BatchTimeoutHelper handy, capture the maxBatchTimeout for writerComponent.
+    this.maxBatchTimeout = timeoutHelper.getMaxBatchTimeout();
 
     Map<String, Object> conf = super.getComponentConfiguration();
     if (conf == null) {
@@ -172,7 +209,6 @@ public class BulkMessageWriterBolt<CONFIG_T extends Configurations> extends Conf
 
   @Override
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-    setWriterComponent(new BulkWriterComponent<>(collector));
     this.collector = collector;
     super.prepare(stormConf, context, collector);
     if (messageGetField != null) {
@@ -186,16 +222,19 @@ public class BulkMessageWriterBolt<CONFIG_T extends Configurations> extends Conf
     else {
       configurationTransformation = x -> x;
     }
+    ackTuplesPolicy = new AckTuplesPolicy(collector, messageGetStrategy);
     try {
       WriterConfiguration writerconf = configurationTransformation
           .apply(getConfigurationStrategy().createWriterConfig(bulkMessageWriter, getConfigurations()));
-      if (defaultBatchTimeout == 0) {
-        //This means getComponentConfiguration was never called to initialize defaultBatchTimeout,
+      if (maxBatchTimeout == 0) {
+        //This means getComponentConfiguration was never called to initialize maxBatchTimeout,
         //probably because we are in a unit test scenario.  So calculate it here.
         BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(writerconf::getAllConfiguredTimeouts, batchTimeoutDivisor);
-        defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout();
+        maxBatchTimeout = timeoutHelper.getMaxBatchTimeout();
       }
-      getWriterComponent().setDefaultBatchTimeout(defaultBatchTimeout);
+      BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(maxBatchTimeout);
+      bulkWriterComponent.addFlushPolicy(ackTuplesPolicy);
+      setWriterComponent(bulkWriterComponent);
       bulkMessageWriter.init(stormConf, context, writerconf);
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -207,7 +246,9 @@ public class BulkMessageWriterBolt<CONFIG_T extends Configurations> extends Conf
    */
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector, Clock clock) {
     prepare(stormConf, context, collector);
-    getWriterComponent().withClock(clock);
+    BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(maxBatchTimeout, clock);
+    bulkWriterComponent.addFlushPolicy(ackTuplesPolicy);
+    setWriterComponent(bulkWriterComponent);
   }
 
   @SuppressWarnings("unchecked")
@@ -218,9 +259,8 @@ public class BulkMessageWriterBolt<CONFIG_T extends Configurations> extends Conf
         if (!(bulkMessageWriter instanceof WriterToBulkWriter)) {
           //WriterToBulkWriter doesn't allow batching, so no need to flush on Tick.
           LOG.debug("Flushing message queues older than their batchTimeouts");
-          getWriterComponent().flushTimeouts(bulkMessageWriter, configurationTransformation.apply(
-              getConfigurationStrategy().createWriterConfig(bulkMessageWriter, getConfigurations())),
-              messageGetStrategy);
+          getWriterComponent().flushAll(bulkMessageWriter, configurationTransformation.apply(
+              getConfigurationStrategy().createWriterConfig(bulkMessageWriter, getConfigurations())));
         }
       }
       catch(Exception e) {
@@ -254,13 +294,12 @@ public class BulkMessageWriterBolt<CONFIG_T extends Configurations> extends Conf
         //want to warn, but not fail the tuple
         collector.reportError(new Exception("WARNING: Default and (likely) unoptimized writer config used for " + bulkMessageWriter.getName() + " writer and sensor " + sensorType));
       }
-
+      String messagesId = MessageUtils.getGuid(message);
+      ackTuplesPolicy.addTupleMessageIds(tuple, Collections.singleton(messagesId));
       getWriterComponent().write(sensorType
-              , tuple
-              , message
+              , new BulkMessage<>(messagesId, message)
               , bulkMessageWriter
               , writerConfiguration
-              , messageGetStrategy
       );
     }
     catch(Exception e) {
@@ -295,11 +334,19 @@ public class BulkMessageWriterBolt<CONFIG_T extends Configurations> extends Conf
   private void handleMissingSensorType(Tuple tuple, JSONObject message) {
     // sensor type somehow ended up being null.  We want to error this message directly.
     LOG.debug("Message is missing sensor type");
-    getWriterComponent().error("null",
-            new Exception("Sensor type is not specified for message " + message.toJSONString()),
-            ImmutableList.of(tuple),
-            messageGetStrategy
-    );
+    String sensorType = "null";
+    Exception e = new Exception("Sensor type is not specified for message " + message.toJSONString());
+    LOG.error(format("Failing %d tuple(s); sensorType=%s", Iterables.size(ImmutableList.of(tuple)), sensorType), e);
+    MetronError error = new MetronError()
+            .withSensorType(Collections.singleton(sensorType))
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR)
+            .withThrowable(e)
+            .addRawMessage(messageGetStrategy.get(tuple));
+    collector.emit(Constants.ERROR_STREAM, new Values(error.getJSONObject()));
+
+    // there is only one error to report for all of the failed tuples
+    collector.reportError(e);
+    collector.ack(tuple);
   }
 
   /**
@@ -309,10 +356,13 @@ public class BulkMessageWriterBolt<CONFIG_T extends Configurations> extends Conf
    */
   private void handleMissingMessage(Tuple tuple) {
     LOG.debug("Unable to extract message from tuple; expected valid JSON");
-    getWriterComponent().error(
-            new Exception("Unable to extract message from tuple; expected valid JSON"),
-            tuple
-    );
+    Exception e = new Exception("Unable to extract message from tuple; expected valid JSON");
+    LOG.error("Failing tuple", e);
+    MetronError error = new MetronError()
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR)
+            .withThrowable(e);
+    collector.ack(tuple);
+    ErrorUtils.handleError(collector, error);
   }
 
   @Override
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
index aaa58fa..9e6827b 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
@@ -24,10 +24,15 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 import org.apache.metron.common.configuration.IndexingConfigurations;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.writer.BulkMessageWriter;
+import org.apache.metron.common.writer.BulkMessage;
 import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.common.writer.MessageId;
 import org.apache.metron.stellar.common.StellarProcessor;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.MapVariableResolver;
@@ -40,7 +45,6 @@ import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
 import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
 import org.apache.storm.hdfs.common.rotation.RotationAction;
 import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -101,40 +105,38 @@ public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
   }
 
   @Override
-  public BulkWriterResponse write(String sourceType
-                   , WriterConfiguration configurations
-                   , Iterable<Tuple> tuples
-                   , List<JSONObject> messages
-                   ) throws Exception {
+  public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, List<BulkMessage<JSONObject>> messages) throws Exception {
     BulkWriterResponse response = new BulkWriterResponse();
+    Set<MessageId> ids = messages.stream().map(BulkMessage::getId).collect(Collectors.toSet());
 
     // Currently treating all the messages in a group for pass/failure.
     // Messages can all result in different HDFS paths, because of Stellar Expressions, so we'll need to iterate through
-    for (JSONObject message : messages) {
+    for (BulkMessage<JSONObject> bulkWriterMessage : messages) {
+      JSONObject message = bulkWriterMessage.getMessage();
       String path = getHdfsPathExtension(
-          sourceType,
-          (String) configurations.getSensorConfig(sourceType)
-              .getOrDefault(IndexingConfigurations.OUTPUT_PATH_FUNCTION_CONF, ""),
-          message
+              sensorType,
+              (String) configurations.getSensorConfig(sensorType)
+                      .getOrDefault(IndexingConfigurations.OUTPUT_PATH_FUNCTION_CONF, ""),
+              message
       );
 
       try {
         LOG.trace("Writing message {} to path: {}", message.toJSONString(), path);
-        SourceHandler handler = getSourceHandler(sourceType, path, configurations);
-        handler.handle(message, sourceType, configurations, syncPolicyCreator);
+        SourceHandler handler = getSourceHandler(sensorType, path, configurations);
+        handler.handle(message, sensorType, configurations, syncPolicyCreator);
       } catch (Exception e) {
         LOG.error(
-            "HdfsWriter encountered error writing. Source type: {}. # messages: {}. Output path: {}.",
-            sourceType,
-            messages.size(),
-            path,
-            e
+                "HdfsWriter encountered error writing. Source type: {}. # messages: {}. Output path: {}.",
+                sensorType,
+                messages.size(),
+                path,
+                e
         );
-        response.addAllErrors(e, tuples);
+        response.addAllErrors(e, ids);
       }
     }
 
-    response.addAllSuccesses(tuples);
+    response.addAllSuccesses(ids);
     return response;
   }
 
diff --git a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
index 599ecbd..78a27fd 100644
--- a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
+++ b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
@@ -22,11 +22,14 @@ import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
 import java.util.AbstractMap;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -36,11 +39,12 @@ import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.utils.KafkaUtils;
 import org.apache.metron.common.utils.StringUtils;
 import org.apache.metron.common.writer.BulkMessageWriter;
+import org.apache.metron.common.writer.BulkMessage;
 import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.common.writer.MessageId;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.metron.writer.AbstractWriter;
 import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -151,6 +155,13 @@ public class KafkaWriter extends AbstractWriter implements BulkMessageWriter<JSO
     return Optional.ofNullable(configPrefix);
   }
 
+  /**
+   * Used only for unit testing
+   */
+  protected void setKafkaProducer(KafkaProducer kafkaProducer) {
+    this.kafkaProducer = kafkaProducer;
+  }
+
   @Override
   public void configure(String sensorName, WriterConfiguration configuration) {
     Map<String, Object> configMap = configuration.getSensorConfig(sensorName);
@@ -228,17 +239,17 @@ public class KafkaWriter extends AbstractWriter implements BulkMessageWriter<JSO
 
   @Override
   public BulkWriterResponse write(String sensorType, WriterConfiguration configurations,
-      Iterable<Tuple> tuples, List<JSONObject> messages) {
+                                  List<BulkMessage<JSONObject>> messages) {
     BulkWriterResponse writerResponse = new BulkWriterResponse();
-    List<Map.Entry<Tuple, Future>> results = new ArrayList<>();
-    int i = 0;
-    for (Tuple tuple : tuples) {
-      JSONObject message = messages.get(i++);
+    List<Map.Entry<MessageId, Future>> results = new ArrayList<>();
+    for (BulkMessage<JSONObject> bulkWriterMessage: messages) {
+      MessageId messageId = bulkWriterMessage.getId();
+      JSONObject message = bulkWriterMessage.getMessage();
       String jsonMessage;
       try {
          jsonMessage = message.toJSONString();
       } catch (Throwable t) {
-        writerResponse.addError(t, tuple);
+        writerResponse.addError(t, messageId);
         continue;
       }
       Optional<String> topic = getKafkaTopic(message);
@@ -246,22 +257,23 @@ public class KafkaWriter extends AbstractWriter implements BulkMessageWriter<JSO
         Future future = kafkaProducer
             .send(new ProducerRecord<String, String>(topic.get(), jsonMessage));
         // we want to manage the batching
-        results.add(new AbstractMap.SimpleEntry<>(tuple, future));
+        results.add(new AbstractMap.SimpleEntry<>(messageId, future));
       }
       else {
         LOG.debug("Dropping {} because no topic is specified.", jsonMessage);
       }
     }
 
+    Collection<MessageId> ids = messages.stream().map(BulkMessage::getId).collect(Collectors.toList());
     try {
       // ensures all Future.isDone() == true
       kafkaProducer.flush();
     } catch (InterruptException e) {
-      writerResponse.addAllErrors(e, tuples);
+      writerResponse.addAllErrors(e, ids);
       return writerResponse;
     }
 
-    for (Map.Entry<Tuple, Future> kv : results) {
+    for (Map.Entry<MessageId, Future> kv : results) {
       try {
         kv.getValue().get();
         writerResponse.addSuccess(kv.getKey());
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/AckTuplesPolicyTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/AckTuplesPolicyTest.java
new file mode 100644
index 0000000..1d970cb
--- /dev/null
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/AckTuplesPolicyTest.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer;
+
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.error.MetronError;
+import org.apache.metron.common.message.MessageGetStrategy;
+import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.common.writer.MessageId;
+import org.apache.metron.test.error.MetronErrorJSONMatcher;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+
+public class AckTuplesPolicyTest {
+
+
+  @Mock
+  private OutputCollector collector;
+
+  @Mock
+  private MessageGetStrategy messageGetStrategy;
+
+  @Mock
+  private Tuple tuple1;
+
+  @Mock
+  private Tuple tuple2;
+
+  private String sensorType = "testSensor";
+
+  private AckTuplesPolicy ackTuplesPolicy;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    ackTuplesPolicy = new AckTuplesPolicy(collector, messageGetStrategy);
+  }
+
+  @Test
+  public void shouldProperlyHandleSuccessAndErrors() throws Exception {
+    String messageId1 = "messageId1";
+    String messageId2 = "messageId2";
+    String messageId3 = "messageId3";
+    JSONObject message1 = new JSONObject();
+    JSONObject message2 = new JSONObject();
+    JSONObject message3 = new JSONObject();
+    message1.put("value", "message1");
+    message2.put("value", "message2");
+    message3.put("value", "message3");
+    Tuple tuple3 = mock(Tuple.class);
+    Throwable e = new Exception("test exception");
+    MetronError expectedError1 = new MetronError()
+            .withSensorType(Collections.singleton(sensorType))
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message1));
+    MetronError expectedError2 = new MetronError()
+            .withSensorType(Collections.singleton(sensorType))
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message2));
+    BulkWriterResponse response = new BulkWriterResponse();
+    response.addAllErrors(e, Arrays.asList(new MessageId(messageId1), new MessageId(messageId2)));
+    response.addSuccess(new MessageId(messageId3));
+
+    when(messageGetStrategy.get(tuple1)).thenReturn(message1);
+    when(messageGetStrategy.get(tuple2)).thenReturn(message2);
+
+    ackTuplesPolicy.addTupleMessageIds(tuple1, Collections.singleton(messageId1));
+    ackTuplesPolicy.addTupleMessageIds(tuple2, Collections.singleton(messageId2));
+    ackTuplesPolicy.addTupleMessageIds(tuple3, Collections.singleton(messageId3));
+
+    ackTuplesPolicy.onFlush(sensorType, response);
+
+    assertEquals(0, ackTuplesPolicy.getTupleMessageMap().size());
+    assertEquals(0, ackTuplesPolicy.getTupleErrorMap().size());
+    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
+            new Values(argThat(new MetronErrorJSONMatcher(expectedError1.getJSONObject()))));
+    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
+            new Values(argThat(new MetronErrorJSONMatcher(expectedError2.getJSONObject()))));
+    verify(collector, times(1)).ack(tuple1);
+    verify(collector, times(1)).ack(tuple2);
+    verify(collector, times(1)).ack(tuple3);
+    verify(collector, times(1)).reportError(e);
+    verifyNoMoreInteractions(collector);
+  }
+
+  @Test
+  public void shouldOnlyReportErrorsOncePerBatch() {
+    AckTuplesPolicy ackTuplesPolicy = new AckTuplesPolicy(collector, messageGetStrategy);
+    JSONObject rawMessage1 = new JSONObject();
+    JSONObject rawMessage2 = new JSONObject();
+    rawMessage1.put("value", "rawMessage1");
+    rawMessage2.put("value", "rawMessage2");
+    String messageId1 = "messageId1";
+    String messageId2 = "messageId2";
+    String messageId3 = "messageId3";
+    JSONObject message1 = new JSONObject();
+    JSONObject message2 = new JSONObject();
+    JSONObject message3 = new JSONObject();
+    message1.put("value", "message1");
+    message2.put("value", "message2");
+    message3.put("value", "message3");
+
+    Throwable e1 = new Exception("test exception 1");
+    Throwable e2 = new Exception("test exception 2");
+    MetronError expectedError1 = new MetronError()
+            .withSensorType(Collections.singleton(sensorType))
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e1).withRawMessages(Collections.singletonList(rawMessage1));
+    MetronError expectedError2 = new MetronError()
+            .withSensorType(Collections.singleton(sensorType))
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e2).withRawMessages(Collections.singletonList(rawMessage1));
+    MetronError expectedError3 = new MetronError()
+            .withSensorType(Collections.singleton(sensorType))
+            .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e1).withRawMessages(Collections.singletonList(rawMessage2));
+
+    when(messageGetStrategy.get(tuple1)).thenReturn(rawMessage1);
+    when(messageGetStrategy.get(tuple2)).thenReturn(rawMessage2);
+
+    ackTuplesPolicy.addTupleMessageIds(tuple1, Arrays.asList(messageId1, messageId2));
+    ackTuplesPolicy.addTupleMessageIds(tuple2, Collections.singletonList(messageId3));
+
+    BulkWriterResponse response = new BulkWriterResponse();
+    response.addError(e1, new MessageId(messageId1));
+
+    ackTuplesPolicy.onFlush(sensorType, response);
+
+    assertEquals(2, ackTuplesPolicy.getTupleMessageMap().size());
+    assertEquals(1, ackTuplesPolicy.getTupleErrorMap().size());
+    verify(collector, times(0)).ack(any());
+    verify(collector, times(0)).reportError(any());
+    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM), new Values(argThat(new MetronErrorJSONMatcher(expectedError1.getJSONObject()))));
+
+    response = new BulkWriterResponse();
+    response.addError(e2, new MessageId(messageId2));
+    response.addError(e1, new MessageId(messageId3));
+
+    ackTuplesPolicy.onFlush(sensorType, response);
+
+    assertEquals(0, ackTuplesPolicy.getTupleMessageMap().size());
+    assertEquals(0, ackTuplesPolicy.getTupleErrorMap().size());
+    verify(collector, times(1)).ack(tuple1);
+    verify(collector, times(1)).ack(tuple2);
+    verify(collector, times(1)).reportError(e1);
+    verify(collector, times(1)).reportError(e2);
+    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM), new Values(argThat(new MetronErrorJSONMatcher(expectedError2.getJSONObject()))));
+    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM), new Values(argThat(new MetronErrorJSONMatcher(expectedError3.getJSONObject()))));
+    verifyNoMoreInteractions(collector);
+  }
+
+  @Test
+  public void shouldProperlyAckTuples() {
+    ackTuplesPolicy.addTupleMessageIds(tuple1, Collections.singletonList("message1"));
+    ackTuplesPolicy.addTupleMessageIds(tuple2, Collections.singletonList("message2"));
+
+    BulkWriterResponse response = new BulkWriterResponse();
+    response.addSuccess(new MessageId("message1"));
+    response.addSuccess(new MessageId("message2"));
+
+    ackTuplesPolicy.onFlush(sensorType, response);
+
+    assertEquals(0, ackTuplesPolicy.getTupleMessageMap().size());
+    verify(collector, times(1)).ack(tuple1);
+    verify(collector, times(1)).ack(tuple2);
+    verifyNoMoreInteractions(collector);
+  }
+
+  @Test
+  public void shouldOnlyAckTupleAfterHandlingAllMessages() {
+    ackTuplesPolicy.addTupleMessageIds(tuple1, Arrays.asList("message1", "message2", "message3"));
+
+    BulkWriterResponse response = new BulkWriterResponse();
+    response.addSuccess(new MessageId("message1"));
+    response.addSuccess(new MessageId("message2"));
+
+    ackTuplesPolicy.onFlush(sensorType, response);
+    verify(collector, times(0)).ack(any());
+
+    response = new BulkWriterResponse();
+    response.addSuccess(new MessageId("message3"));
+
+    ackTuplesPolicy.onFlush(sensorType, response);
+
+    assertEquals(0, ackTuplesPolicy.getTupleMessageMap().size());
+    verify(collector, times(1)).ack(tuple1);
+    verifyNoMoreInteractions(collector);
+  }
+}
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BatchSizePolicyTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BatchSizePolicyTest.java
new file mode 100644
index 0000000..aab8a73
--- /dev/null
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BatchSizePolicyTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer;
+
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.writer.BulkMessage;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class BatchSizePolicyTest {
+
+  private String sensorType = "sensorType";
+  private WriterConfiguration configurations = mock(WriterConfiguration.class);
+  private List<BulkMessage<JSONObject>> messages = new ArrayList<>();
+
+  @Before
+  public void setup() {
+    when(configurations.getBatchSize(sensorType)).thenReturn(2);
+  }
+
+  @Test
+  public void shouldFlushWhenBatchSizeReached() {
+    BatchSizePolicy<JSONObject> batchSizePolicy = new BatchSizePolicy<>();
+
+    messages.add(new BulkMessage<>("message1", new JSONObject()));
+    messages.add(new BulkMessage<>("message2", new JSONObject()));
+    assertTrue(batchSizePolicy.shouldFlush(sensorType, configurations, messages));
+  }
+
+  @Test
+  public void shouldNotFlushWhenBatchSizeNotReached() {
+    BatchSizePolicy<JSONObject> batchSizePolicy = new BatchSizePolicy<>();
+
+    messages.add(new BulkMessage<>("message1", new JSONObject()));
+    assertFalse(batchSizePolicy.shouldFlush(sensorType, configurations, messages));
+  }
+
+  @Test
+  public void shouldFlushWhenBatchSizeExceeded() {
+    BatchSizePolicy<JSONObject> batchSizePolicy = new BatchSizePolicy<>();
+
+    messages.add(new BulkMessage<>("message1", new JSONObject()));
+    messages.add(new BulkMessage<>("message2", new JSONObject()));
+    messages.add(new BulkMessage<>("message3", new JSONObject()));
+    assertTrue(batchSizePolicy.shouldFlush(sensorType, configurations, messages));
+  }
+}
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BatchTimeoutPolicyTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BatchTimeoutPolicyTest.java
new file mode 100644
index 0000000..d29c373
--- /dev/null
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BatchTimeoutPolicyTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer;
+
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.system.Clock;
+import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.common.writer.BulkMessage;
+import org.json.simple.JSONObject;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class BatchTimeoutPolicyTest {
+
+  private String sensor1 = "sensor1";
+  private String sensor2 = "sensor2";
+  private WriterConfiguration configurations = mock(WriterConfiguration.class);
+  private int maxBatchTimeout = 6;
+  private List<BulkMessage<JSONObject>> messages = new ArrayList<>();
+
+  @Test
+  public void shouldFlushSensorsOnTimeouts() {
+    Clock clock = mock(Clock.class);
+
+    BatchTimeoutPolicy batchTimeoutPolicy = new BatchTimeoutPolicy<>(maxBatchTimeout, clock);
+    when(configurations.getBatchTimeout(sensor1)).thenReturn(1);
+    when(configurations.getBatchTimeout(sensor2)).thenReturn(2);
+
+    when(clock.currentTimeMillis()).thenReturn(0L); // initial check
+    assertFalse(batchTimeoutPolicy.shouldFlush(sensor1, configurations, messages));
+    assertFalse(batchTimeoutPolicy.shouldFlush(sensor2, configurations, messages));
+
+    when(clock.currentTimeMillis()).thenReturn(999L); // no timeouts yet
+    assertFalse(batchTimeoutPolicy.shouldFlush(sensor1, configurations, messages));
+    assertFalse(batchTimeoutPolicy.shouldFlush(sensor2, configurations, messages));
+
+    when(clock.currentTimeMillis()).thenReturn(1000L); // first sensor timeout reached
+    assertTrue(batchTimeoutPolicy.shouldFlush(sensor1, configurations, messages));
+    assertFalse(batchTimeoutPolicy.shouldFlush(sensor2, configurations, messages));
+
+    when(clock.currentTimeMillis()).thenReturn(2000L); // second sensor timeout reached
+    assertTrue(batchTimeoutPolicy.shouldFlush(sensor2, configurations, messages));
+  }
+
+  @Test
+  public void shouldResetTimeouts() {
+    Clock clock = mock(Clock.class);
+
+    BatchTimeoutPolicy batchTimeoutPolicy = new BatchTimeoutPolicy(maxBatchTimeout, clock);
+    when(configurations.getBatchTimeout(sensor1)).thenReturn(1);
+
+    when(clock.currentTimeMillis()).thenReturn(0L); // initial check
+    assertFalse(batchTimeoutPolicy.shouldFlush(sensor1, configurations, messages));
+
+    batchTimeoutPolicy.onFlush(sensor1, new BulkWriterResponse());
+
+    when(clock.currentTimeMillis()).thenReturn(1000L); // sensor was reset so shouldn't timeout
+    assertFalse(batchTimeoutPolicy.shouldFlush(sensor1, configurations, messages));
+
+    when(clock.currentTimeMillis()).thenReturn(2000L); // sensor timeout should be 2 now
+    assertTrue(batchTimeoutPolicy.shouldFlush(sensor1, configurations, messages));
+  }
+
+  @Test
+  public void getBatchTimeoutShouldReturnConfiguredTimeout() {
+    BatchTimeoutPolicy batchTimeoutPolicy = new BatchTimeoutPolicy(maxBatchTimeout);
+
+    when(configurations.getBatchTimeout(sensor1)).thenReturn(5);
+
+    assertEquals(5000L, batchTimeoutPolicy.getBatchTimeout(sensor1, configurations));
+  }
+
+  @Test
+  public void getBatchTimeoutShouldReturnMaxBatchTimeout() {
+    BatchTimeoutPolicy batchTimeoutPolicy = new BatchTimeoutPolicy(maxBatchTimeout);
+
+    when(configurations.getBatchTimeout(sensor1)).thenReturn(0);
+
+    assertEquals(maxBatchTimeout * 1000, batchTimeoutPolicy.getBatchTimeout(sensor1, configurations));
+  }
+}
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
index 754a650..16f3b4f 100644
--- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/BulkWriterComponentTest.java
@@ -17,38 +17,28 @@
  */
 package org.apache.metron.writer;
 
-import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.powermock.api.mockito.PowerMockito.verifyStatic;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
+
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.error.MetronError;
-import org.apache.metron.common.message.MessageGetStrategy;
-import org.apache.metron.common.system.Clock;
 import org.apache.metron.common.utils.ErrorUtils;
 import org.apache.metron.common.writer.BulkMessageWriter;
+import org.apache.metron.common.writer.BulkMessage;
 import org.apache.metron.common.writer.BulkWriterResponse;
-import org.apache.metron.test.error.MetronErrorJSONMatcher;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
+import org.apache.metron.common.writer.MessageId;
 import org.json.simple.JSONObject;
 import org.junit.Before;
 import org.junit.Rule;
@@ -68,7 +58,7 @@ public class BulkWriterComponentTest {
   public final ExpectedException exception = ExpectedException.none();
 
   @Mock
-  private OutputCollector collector;
+  private FlushPolicy<JSONObject> flushPolicy;
 
   @Mock
   private BulkMessageWriter<JSONObject> bulkMessageWriter;
@@ -76,17 +66,11 @@ public class BulkWriterComponentTest {
   @Mock
   private WriterConfiguration configurations;
 
-  @Mock
-  private Tuple tuple1;
-
-  @Mock
-  private Tuple tuple2;
-
-  @Mock
-  private MessageGetStrategy messageGetStrategy;
-
+  private MessageId messageId1 = new MessageId("messageId1");
+  private MessageId messageId2 = new MessageId("messageId2");
   private String sensorType = "testSensor";
-  private List<Tuple> tupleList;
+  private List<MessageId> messageIds;
+  private List<BulkMessage<JSONObject>> messages;
   private JSONObject message1 = new JSONObject();
   private JSONObject message2 = new JSONObject();
 
@@ -96,256 +80,185 @@ public class BulkWriterComponentTest {
     mockStatic(ErrorUtils.class);
     message1.put("value", "message1");
     message2.put("value", "message2");
-    when(tuple1.getValueByField("message")).thenReturn(message1);
-    when(tuple2.getValueByField("message")).thenReturn(message2);
-    tupleList = Arrays.asList(tuple1, tuple2);
+    messageIds = Arrays.asList(messageId1, messageId2);
+    messages = new ArrayList<BulkMessage<JSONObject>>() {{
+      add(new BulkMessage<>(messageId1, message1));
+      add(new BulkMessage<>(messageId2, message2));
+    }};
     when(configurations.isEnabled(any())).thenReturn(true);
-    when(configurations.getBatchSize(any())).thenReturn(2);
-    when(messageGetStrategy.get(tuple1)).thenReturn(message1);
-    when(messageGetStrategy.get(tuple2)).thenReturn(message2);
   }
 
   @Test
   public void writeShouldProperlyAckTuplesInBatch() throws Exception {
+    BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(Collections.singletonList(flushPolicy));
     BulkWriterResponse response = new BulkWriterResponse();
-    response.addAllSuccesses(tupleList);
+    response.addAllSuccesses(messageIds);
+
+    when(bulkMessageWriter.write(sensorType, configurations, messages)).thenReturn(response);
 
-    when(bulkMessageWriter.write(sensorType, configurations, Arrays.asList(tuple1, tuple2), Arrays.asList(message1, message2))).thenReturn(response);
+    bulkWriterComponent.write(sensorType, messages.get(0), bulkMessageWriter, configurations);
 
-    BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector);
-    bulkWriterComponent.write(sensorType, tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy);
+    verify(bulkMessageWriter, times(0)).write(eq(sensorType), eq(configurations), any());
+    verify(flushPolicy, times(1)).shouldFlush(sensorType, configurations, messages.subList(0, 1));
+    verify(flushPolicy, times(0)).onFlush(any(), any());
 
-    verify(bulkMessageWriter, times(0)).write(sensorType, configurations, Collections.singletonList(tuple1), Collections.singletonList(message1));
-    verify(collector, times(0)).ack(tuple1);
-    verify(collector, times(0)).ack(tuple2);
+    reset(flushPolicy);
 
-    bulkWriterComponent.write(sensorType, tuple2, message2, bulkMessageWriter, configurations, messageGetStrategy);
+    when(flushPolicy.shouldFlush(sensorType, configurations, messages)).thenReturn(true);
 
-    verify(collector, times(1)).ack(tuple1);
-    verify(collector, times(1)).ack(tuple2);
+    bulkWriterComponent.write(sensorType, messages.get(1), bulkMessageWriter, configurations);
 
-    // A disabled writer should still ack
-    Tuple disabledTuple = mock(Tuple.class);
-    String disabledSensorType = "disabled";
-    when(configurations.isEnabled(disabledSensorType)).thenReturn(false);
-    bulkWriterComponent.write(disabledSensorType, disabledTuple, message2, bulkMessageWriter, configurations, messageGetStrategy);
-    verify(collector, times(1)).ack(disabledTuple);
+    BulkWriterResponse expectedResponse = new BulkWriterResponse();
+    expectedResponse.addAllSuccesses(messageIds);
+    verify(bulkMessageWriter, times(1)).write(sensorType, configurations,
+            Arrays.asList(new BulkMessage<>(messageId1, message1), new BulkMessage<>(messageId2, message2)));
+    verify(flushPolicy, times(1)).shouldFlush(sensorType, configurations, messages);
+    verify(flushPolicy, times(1)).onFlush(sensorType, expectedResponse);
 
-    verifyStatic(times(0));
-    ErrorUtils.handleError(eq(collector), any(MetronError.class));
+    verifyNoMoreInteractions(bulkMessageWriter, flushPolicy);
   }
 
   @Test
-  public void writeShouldProperlyHandleWriterErrors() throws Exception {
-    Throwable e = new Exception("test exception");
-    MetronError expectedError1 = new MetronError()
-            .withSensorType(Collections.singleton(sensorType))
-            .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message1));
-    MetronError expectedError2 = new MetronError()
-            .withSensorType(Collections.singleton(sensorType))
-            .withErrorType(Constants.ErrorType.INDEXING_ERROR).withThrowable(e).withRawMessages(Collections.singletonList(message2));
-    BulkWriterResponse response = new BulkWriterResponse();
-    response.addAllErrors(e, tupleList);
-
-    when(bulkMessageWriter.write(sensorType, configurations, Arrays.asList(tuple1, tuple2), Arrays.asList(message1, message2))).thenReturn(response);
-
-    BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector);
-    bulkWriterComponent.write(sensorType, tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy);
-    bulkWriterComponent.write(sensorType, tuple2, message2, bulkMessageWriter, configurations, messageGetStrategy);
-
-    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
-            new Values(argThat(new MetronErrorJSONMatcher(expectedError1.getJSONObject()))));
-    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
-            new Values(argThat(new MetronErrorJSONMatcher(expectedError2.getJSONObject()))));
-    verify(collector, times(1)).ack(tuple1);
-    verify(collector, times(1)).ack(tuple2);
-    verify(collector, times(1)).reportError(e);
-    verifyNoMoreInteractions(collector);
+  public void writeShouldFlushPreviousMessagesWhenDisabled() throws Exception {
+    BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(Collections.singletonList(flushPolicy));
+    BulkMessage<JSONObject> beforeDisabledMessage = messages.get(0);
+    BulkMessage<JSONObject> afterDisabledMessage = messages.get(1);
+    BulkWriterResponse beforeDisabledResponse = new BulkWriterResponse();
+    beforeDisabledResponse.addSuccess(beforeDisabledMessage.getId());
+    BulkWriterResponse afterDisabledResponse = new BulkWriterResponse();
+    afterDisabledResponse.addSuccess(afterDisabledMessage.getId());
+
+    when(bulkMessageWriter.write(sensorType, configurations, Collections.singletonList(messages.get(0)))).thenReturn(beforeDisabledResponse);
+
+    bulkWriterComponent.write(sensorType, beforeDisabledMessage, bulkMessageWriter, configurations);
+
+    verify(bulkMessageWriter, times(0)).write(eq(sensorType), eq(configurations), any());
+    verify(flushPolicy, times(1)).shouldFlush(sensorType, configurations, messages.subList(0, 1));
+    verify(flushPolicy, times(0)).onFlush(any(), any());
+
+    when(configurations.isEnabled(sensorType)).thenReturn(false);
+
+    bulkWriterComponent.write(sensorType, messages.get(1), bulkMessageWriter, configurations);
+
+    verify(bulkMessageWriter, times(1)).write(sensorType, configurations, Collections.singletonList(messages.get(0)));
+    verify(flushPolicy, times(1)).onFlush(sensorType, beforeDisabledResponse);
+    verify(flushPolicy, times(1)).onFlush(sensorType, afterDisabledResponse);
+
+    verifyNoMoreInteractions(bulkMessageWriter, flushPolicy);
   }
 
   @Test
-  public void writeShouldThrowExceptionWhenHandleErrorIsFalse() throws Exception {
-    exception.expect(IllegalStateException.class);
-
+  public void writeShouldProperlyHandleWriterErrors() throws Exception {
+    BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(Collections.singletonList(flushPolicy));
     Throwable e = new Exception("test exception");
     BulkWriterResponse response = new BulkWriterResponse();
-    response.addAllErrors(e, tupleList);
+    response.addAllErrors(e, messageIds);
+
+    when(bulkMessageWriter.write(sensorType, configurations, messages)).thenReturn(response);
+
+    bulkWriterComponent.write(sensorType, messages.get(0), bulkMessageWriter, configurations);
 
-    when(bulkMessageWriter.write(sensorType, configurations, Arrays.asList(tuple1, tuple2), Arrays.asList(message1, message2))).thenReturn(response);
+    verify(bulkMessageWriter, times(0)).write(eq(sensorType), eq(configurations), any());
+    verify(flushPolicy, times(1)).shouldFlush(sensorType, configurations, messages.subList(0, 1));
+    verify(flushPolicy, times(0)).onFlush(any(), any());
 
-    BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector, true, false);
-    bulkWriterComponent.write(sensorType, tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy);
-    bulkWriterComponent.write(sensorType, tuple2, message2, bulkMessageWriter, configurations, messageGetStrategy);
+    reset(flushPolicy);
+
+    when(flushPolicy.shouldFlush(sensorType, configurations, messages)).thenReturn(true);
+
+    bulkWriterComponent.write(sensorType, messages.get(1), bulkMessageWriter, configurations);
+
+    BulkWriterResponse expectedErrorResponse = new BulkWriterResponse();
+    expectedErrorResponse.addAllErrors(e, messageIds);
+
+    verify(bulkMessageWriter, times(1)).write(sensorType, configurations, messages);
+    verify(flushPolicy, times(1)).shouldFlush(sensorType, configurations, messages);
+    verify(flushPolicy, times(1)).onFlush(sensorType, expectedErrorResponse);
+
+    verifyNoMoreInteractions(bulkMessageWriter, flushPolicy);
   }
 
   @Test
   public void writeShouldProperlyHandleWriterException() throws Exception {
+    BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(Collections.singletonList(flushPolicy));
     Throwable e = new Exception("test exception");
-    MetronError expectedError1 = new MetronError()
-            .withSensorType(Collections.singleton(sensorType))
-            .withErrorType(Constants.ErrorType.INDEXING_ERROR)
-            .withThrowable(e)
-            .withRawMessages(Collections.singletonList(message1));
-    MetronError expectedError2 = new MetronError()
-            .withSensorType(Collections.singleton(sensorType))
-            .withErrorType(Constants.ErrorType.INDEXING_ERROR)
-            .withThrowable(e)
-            .withRawMessages(Collections.singletonList(message2));
     BulkWriterResponse response = new BulkWriterResponse();
-    response.addAllErrors(e, tupleList);
-
-    when(bulkMessageWriter.write(sensorType, configurations, Arrays.asList(tuple1, tuple2), Arrays.asList(message1, message2))).thenThrow(e);
-
-    BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector);
-    bulkWriterComponent.write(sensorType, tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy);
-    bulkWriterComponent.write(sensorType, tuple2, message2, bulkMessageWriter, configurations, messageGetStrategy);
-
-    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
-            new Values(argThat(new MetronErrorJSONMatcher(expectedError1.getJSONObject()))));
-    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
-            new Values(argThat(new MetronErrorJSONMatcher(expectedError2.getJSONObject()))));
-    verify(collector, times(1)).ack(tuple1);
-    verify(collector, times(1)).ack(tuple2);
-    verify(collector, times(1)).reportError(e);
-    verifyNoMoreInteractions(collector);
-  }
+    response.addAllErrors(e, messageIds);
 
-  @Test
-  public void errorAllShouldClearMapsAndHandleErrors() throws Exception {
-    Throwable e = new Exception("test exception");
-    MetronError error1 = new MetronError()
-            .withSensorType(Collections.singleton("sensor1"))
-            .withErrorType(Constants.ErrorType.INDEXING_ERROR)
-            .withThrowable(e)
-            .withRawMessages(Collections.singletonList(message1));
-    MetronError error2 = new MetronError()
-            .withSensorType(Collections.singleton("sensor2"))
-            .withErrorType(Constants.ErrorType.INDEXING_ERROR)
-            .withThrowable(e)
-            .withRawMessages(Collections.singletonList(message2));
-
-    BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector);
-    bulkWriterComponent.write("sensor1", tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy);
-    bulkWriterComponent.write("sensor2", tuple2, message2, bulkMessageWriter, configurations, messageGetStrategy);
-    bulkWriterComponent.errorAll(e, messageGetStrategy);
-
-    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
-            new Values(argThat(new MetronErrorJSONMatcher(error1.getJSONObject()))));
-    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM),
-            new Values(argThat(new MetronErrorJSONMatcher(error2.getJSONObject()))));
-    verify(collector, times(1)).ack(tuple1);
-    verify(collector, times(1)).ack(tuple2);
-    verify(collector, times(2)).reportError(e);
-    verifyNoMoreInteractions(collector);
-
-    bulkWriterComponent.write("sensor1", tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy);
-    verify(bulkMessageWriter, times(0)).write(sensorType, configurations, Collections.singletonList(tuple1), Collections.singletonList(message1));
+    when(bulkMessageWriter.write(sensorType, configurations, messages)).thenThrow(e);
+
+    bulkWriterComponent.write(sensorType, messages.get(0), bulkMessageWriter, configurations);
+
+    verify(bulkMessageWriter, times(0)).write(eq(sensorType), eq(configurations), any());
+    verify(flushPolicy, times(1)).shouldFlush(sensorType, configurations, messages.subList(0, 1));
+    verify(flushPolicy, times(0)).onFlush(any(), any());
+
+    reset(flushPolicy);
+
+    when(flushPolicy.shouldFlush(sensorType, configurations, messages)).thenReturn(true);
+
+    bulkWriterComponent.write(sensorType, messages.get(1), bulkMessageWriter, configurations);
+
+    BulkWriterResponse expectedErrorResponse = new BulkWriterResponse();
+    expectedErrorResponse.addAllErrors(e, messageIds);
+
+    verify(bulkMessageWriter, times(1)).write(sensorType, configurations, messages);
+    verify(flushPolicy, times(1)).shouldFlush(sensorType, configurations, messages);
+    verify(flushPolicy, times(1)).onFlush(sensorType, expectedErrorResponse);
+
+    verifyNoMoreInteractions(flushPolicy);
   }
 
   @Test
   public void flushShouldAckMissingTuples() throws Exception{
+    BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(Collections.singletonList(flushPolicy));
     BulkMessageWriter<JSONObject> bulkMessageWriter = mock(BulkMessageWriter.class);
-    Tuple successTuple = mock(Tuple.class);
-    Tuple errorTuple = mock(Tuple.class);
-    Tuple missingTuple = mock(Tuple.class);
-    Collection<Tuple> tupleList = Arrays.asList(successTuple, errorTuple, missingTuple);
+    MessageId successId = new MessageId("successId");
+    MessageId errorId = new MessageId("errorId");
+    MessageId missingId = new MessageId("missingId");
     JSONObject successMessage = new JSONObject();
     successMessage.put("name", "success");
     JSONObject errorMessage = new JSONObject();
     errorMessage.put("name", "error");
     JSONObject missingMessage = new JSONObject();
     missingMessage.put("name", "missing");
-    List<JSONObject> messageList = Arrays.asList(successMessage, errorMessage, missingMessage);
-    OutputCollector collector = mock(OutputCollector.class);
+    List<BulkMessage<JSONObject>> allMessages = new ArrayList<BulkMessage<JSONObject>>() {{
+      add(new BulkMessage<>(successId, successMessage));
+      add(new BulkMessage<>(errorId, errorMessage));
+      add(new BulkMessage<>(missingId, missingMessage));
+    }};
     BulkWriterResponse bulkWriterResponse = new BulkWriterResponse();
-    bulkWriterResponse.addSuccess(successTuple);
+    bulkWriterResponse.addSuccess(successId);
     Throwable throwable = mock(Throwable.class);
-    bulkWriterResponse.addError(throwable, errorTuple);
+    bulkWriterResponse.addError(throwable, errorId);
 
-    when(bulkMessageWriter.write(sensorType, configurations, tupleList, messageList)).thenReturn(bulkWriterResponse);
+    when(bulkMessageWriter.write(sensorType, configurations, allMessages)).thenReturn(bulkWriterResponse);
 
-    BulkWriterComponent bulkWriterComponent = new BulkWriterComponent(collector, true, true);
-    bulkWriterComponent.flush(sensorType, bulkMessageWriter, configurations, messageGetStrategy, tupleList, messageList);
+    bulkWriterComponent.flush(sensorType, bulkMessageWriter, configurations, allMessages);
 
-    verify(collector, times(1)).emit(eq(Constants.ERROR_STREAM), any(Values.class));
-    verify(collector, times(1)).reportError(throwable);
-    verify(collector, times(1)).ack(successTuple);
-    verify(collector, times(1)).ack(errorTuple);
-    verify(collector, times(1)).ack(missingTuple);
-    verifyNoMoreInteractions(collector);
-  }
+    BulkWriterResponse expectedResponse = new BulkWriterResponse();
+    expectedResponse.addSuccess(successId);
+    expectedResponse.addError(throwable, errorId);
+    expectedResponse.addSuccess(missingId);
 
-  @Test
-  public void flushTimeoutsShouldFlushAllMessagesAfterDefaultTimeout() throws Exception {
-    Clock clock = mock(Clock.class);
-    BulkMessageWriter<JSONObject> bulkMessageWriter = mock(BulkMessageWriter.class);
-    BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector, false, false).withClock(clock);
-    assertEquals(6, bulkWriterComponent.getDefaultBatchTimeout());
-
-    BulkWriterResponse response1 = new BulkWriterResponse();
-    response1.addSuccess(tuple1);
-    when(bulkMessageWriter.write("sensor1", configurations, Collections.singletonList(tuple1), Collections.singletonList(message1))).thenReturn(response1);
-    BulkWriterResponse response2 = new BulkWriterResponse();
-    response1.addSuccess(tuple2);
-    when(bulkMessageWriter.write("sensor2", configurations, Collections.singletonList(tuple2), Collections.singletonList(message2))).thenReturn(response2);
-
-    when(clock.currentTimeMillis()).thenReturn(0L);
-    bulkWriterComponent.write("sensor1", tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy);
-    bulkWriterComponent.write("sensor2", tuple2, message2, bulkMessageWriter, configurations, messageGetStrategy);
-
-    when(clock.currentTimeMillis()).thenReturn(1000L);
-    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, messageGetStrategy);
-    when(clock.currentTimeMillis()).thenReturn(2000L);
-    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, messageGetStrategy);
-    when(clock.currentTimeMillis()).thenReturn(3000L);
-    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, messageGetStrategy);
-    when(clock.currentTimeMillis()).thenReturn(4000L);
-    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, messageGetStrategy);
-    when(clock.currentTimeMillis()).thenReturn(5000L);
-    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, messageGetStrategy);
-    when(clock.currentTimeMillis()).thenReturn(5999L);
-    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, messageGetStrategy);
-    when(clock.currentTimeMillis()).thenReturn(6000L); // triggers timeout
-    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, messageGetStrategy);
-
-    verify(bulkMessageWriter, times(1)).write("sensor1", configurations, Collections.singletonList(tuple1), Collections.singletonList(message1));
-    verify(bulkMessageWriter, times(1)).write("sensor2", configurations, Collections.singletonList(tuple2), Collections.singletonList(message2));
-
-    when(clock.currentTimeMillis()).thenReturn(6001L); // no trigger
-    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, messageGetStrategy);
-
-    verifyNoMoreInteractions(bulkMessageWriter);
+    verify(flushPolicy, times(1)).onFlush(sensorType, expectedResponse);
+    verifyNoMoreInteractions(flushPolicy);
   }
 
   @Test
-  public void flushTimeoutsShouldFlushAllMessagesAfterConfiguredTimeout() throws Exception {
-    Clock clock = mock(Clock.class);
-    BulkMessageWriter<JSONObject> bulkMessageWriter = mock(BulkMessageWriter.class);
-    BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(collector, false, false).withClock(clock);
-    bulkWriterComponent.setDefaultBatchTimeout(1);
-
-    BulkWriterResponse response1 = new BulkWriterResponse();
-    response1.addSuccess(tuple1);
-    when(bulkMessageWriter.write("sensor1", configurations, Collections.singletonList(tuple1), Collections.singletonList(message1))).thenReturn(response1);
-    BulkWriterResponse response2 = new BulkWriterResponse();
-    response1.addSuccess(tuple2);
-    when(bulkMessageWriter.write("sensor2", configurations, Collections.singletonList(tuple2), Collections.singletonList(message2))).thenReturn(response2);
+  public void flushAllShouldFlushAllSensors() {
+    BulkWriterComponent<JSONObject> bulkWriterComponent = new BulkWriterComponent<>(Collections.singletonList(flushPolicy));
 
-    when(clock.currentTimeMillis()).thenReturn(0L);
-    bulkWriterComponent.write("sensor1", tuple1, message1, bulkMessageWriter, configurations, messageGetStrategy);
-    bulkWriterComponent.write("sensor2", tuple2, message2, bulkMessageWriter, configurations, messageGetStrategy);
+    bulkWriterComponent.write("sensor1", messages.get(0), bulkMessageWriter, configurations);
+    bulkWriterComponent.write("sensor2", messages.get(1), bulkMessageWriter, configurations);
 
-    when(clock.currentTimeMillis()).thenReturn(999L);
-    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, messageGetStrategy);
-    when(clock.currentTimeMillis()).thenReturn(1000L); // triggers timeout
-    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, messageGetStrategy);
+    reset(flushPolicy);
 
-    verify(bulkMessageWriter, times(1)).write("sensor1", configurations, Collections.singletonList(tuple1), Collections.singletonList(message1));
-    verify(bulkMessageWriter, times(1)).write("sensor2", configurations, Collections.singletonList(tuple2), Collections.singletonList(message2));
+    bulkWriterComponent.flushAll(bulkMessageWriter, configurations);
 
-    when(clock.currentTimeMillis()).thenReturn(1001L); // no trigger
-    bulkWriterComponent.flushTimeouts(bulkMessageWriter, configurations, messageGetStrategy);
+    verify(flushPolicy, times(1)).shouldFlush("sensor1", configurations, messages.subList(0, 1));
+    verify(flushPolicy, times(1)).shouldFlush("sensor2", configurations, messages.subList(1, 2));
 
-    verifyNoMoreInteractions(bulkMessageWriter);
+    verifyNoMoreInteractions(flushPolicy);
   }
 }
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BatchTimeoutHelperTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BatchTimeoutHelperTest.java
index 6d9b62e..6fafbec 100644
--- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BatchTimeoutHelperTest.java
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BatchTimeoutHelperTest.java
@@ -38,33 +38,33 @@ public class BatchTimeoutHelperTest {
   private final TimeoutListSupplier illegalTimeoutsList = new TimeoutListSupplier(Arrays.asList(5, 2, -3, 6));
 
   @Test
-  public void testGetDefaultBatchTimeout() throws Exception {
-    //The defaultBatchTimeout is dependent only on batchTimeoutDivisor and the Storm config
+  public void testGetMaxBatchTimeout() throws Exception {
+    //The maxBatchTimeout is dependent only on batchTimeoutDivisor and the Storm config
     //and CLI overrides, which aren't of interest here.
     assertEquals(30, Utils.readStormConfig().getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 0));
     BatchTimeoutHelper bth;
     bth = new BatchTimeoutHelper(defaultConfigList, 1);
-    assertEquals(14, bth.getDefaultBatchTimeout());
+    assertEquals(14, bth.getMaxBatchTimeout());
     bth = new BatchTimeoutHelper(defaultConfigList, 2);
-    assertEquals(6, bth.getDefaultBatchTimeout());
+    assertEquals(6, bth.getMaxBatchTimeout());
     bth = new BatchTimeoutHelper(defaultConfigList, 3);
-    assertEquals(4, bth.getDefaultBatchTimeout());
+    assertEquals(4, bth.getMaxBatchTimeout());
     bth = new BatchTimeoutHelper(defaultConfigList, 4);
-    assertEquals(2, bth.getDefaultBatchTimeout());
+    assertEquals(2, bth.getMaxBatchTimeout());
     bth = new BatchTimeoutHelper(defaultConfigList, 6);
-    assertEquals(1, bth.getDefaultBatchTimeout());
+    assertEquals(1, bth.getMaxBatchTimeout());
     bth = new BatchTimeoutHelper(defaultConfigList, 20);
-    assertEquals(1, bth.getDefaultBatchTimeout());
+    assertEquals(1, bth.getMaxBatchTimeout());
 
     bth = new BatchTimeoutHelper(disabledConfigList, 2);
-    assertEquals(6, bth.getDefaultBatchTimeout());
+    assertEquals(6, bth.getMaxBatchTimeout());
     bth = new BatchTimeoutHelper(smallTimeoutsList, 2);
-    assertEquals(6, bth.getDefaultBatchTimeout());
+    assertEquals(6, bth.getMaxBatchTimeout());
   }
 
   @Test
   public void testGetRecommendedTickInterval() throws Exception {
-    //The recommendedTickInterval is the min of defaultBatchTimeout and the configured TimeoutsList.
+    //The recommendedTickInterval is the min of maxBatchTimeout and the configured TimeoutsList.
     BatchTimeoutHelper bth;
     bth = new BatchTimeoutHelper(defaultConfigList, 2);
     assertEquals(6, bth.getRecommendedTickInterval());
diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BulkMessageWriterBoltTest.java
similarity index 50%
rename from metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
rename to metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BulkMessageWriterBoltTest.java
index 083628c..df80296 100644
--- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/bolt/BulkMessageWriterBoltTest.java
@@ -15,18 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.metron.enrichment.bolt;
+package org.apache.metron.writer.bolt;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import java.io.FileInputStream;
@@ -39,52 +42,28 @@ import org.apache.log4j.Level;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.IndexingConfigurations;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.message.MessageGetStrategy;
 import org.apache.metron.common.message.MessageGetters;
 import org.apache.metron.common.system.FakeClock;
 import org.apache.metron.common.writer.BulkMessageWriter;
+import org.apache.metron.common.writer.BulkMessage;
 import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.common.writer.MessageId;
 import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.apache.metron.writer.BulkWriterComponent;
-import org.apache.metron.writer.bolt.BulkMessageWriterBolt;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.hamcrest.Description;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.json.simple.parser.ParseException;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-import org.mockito.Matchers;
 import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
 public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
 
-  protected class MessageListMatcher extends ArgumentMatcher<List<JSONObject>> {
-
-    private List<JSONObject> expectedMessageList;
-
-    public MessageListMatcher(List<JSONObject> expectedMessageList) {
-      this.expectedMessageList = expectedMessageList;
-    }
-
-    @Override
-    public boolean matches(Object o) {
-      List<JSONObject> actualMessageList = (List<JSONObject>) o;
-      for(JSONObject message: actualMessageList) removeTimingFields(message);
-      return expectedMessageList.equals(actualMessageList);
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description.appendText(String.format("[%s]", expectedMessageList));
-    }
-
-  }
-
   /**
    * {
    * "field": "value",
@@ -94,8 +73,13 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
   @Multiline
   private String sampleMessageString;
 
+  @Mock
+  private BulkMessageWriter<JSONObject> bulkMessageWriter;
+
+  private BulkMessageWriterBolt<IndexingConfigurations> bulkMessageWriterBolt;
   private JSONObject sampleMessage;
-  private List<JSONObject> messageList;
+  private List<MessageId> messageIdList;
+  private List<BulkMessage<JSONObject>> messageList;
   private List<JSONObject> fullMessageList;
   private List<Tuple> tupleList;
 
@@ -104,23 +88,41 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
     JSONParser parser = new JSONParser();
     fullMessageList = new ArrayList<>();
     sampleMessage = (JSONObject) parser.parse(sampleMessageString);
+    sampleMessage.put(Constants.GUID, "message1");
     sampleMessage.put("field", "value1");
     fullMessageList.add(((JSONObject) sampleMessage.clone()));
+    sampleMessage.put(Constants.GUID, "message2");
     sampleMessage.put("field", "value2");
     fullMessageList.add(((JSONObject) sampleMessage.clone()));
+    sampleMessage.put(Constants.GUID, "message3");
     sampleMessage.put("field", "value3");
     fullMessageList.add(((JSONObject) sampleMessage.clone()));
+    sampleMessage.put(Constants.GUID, "message4");
     sampleMessage.put("field", "value4");
     fullMessageList.add(((JSONObject) sampleMessage.clone()));
+    sampleMessage.put(Constants.GUID, "message5");
     sampleMessage.put("field", "value5");
     fullMessageList.add(((JSONObject) sampleMessage.clone()));
-  }
 
-  @Mock
-  private BulkMessageWriter<JSONObject> bulkMessageWriter;
-
-  @Mock
-  private MessageGetStrategy messageGetStrategy;
+    MockitoAnnotations.initMocks(this);
+    messageIdList = new ArrayList<>();
+    tupleList = new ArrayList<>();
+    messageList = new ArrayList<>();
+    bulkMessageWriterBolt = spy(new BulkMessageWriterBolt<IndexingConfigurations>(
+            "zookeeperUrl", "INDEXING")
+            .withBulkMessageWriter(bulkMessageWriter)
+            .withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
+            .withMessageGetterField("message"));
+    for(int i = 0; i < 5; i++) {
+      String messageId = String.format("message%s", i + 1);
+      messageIdList.add(new MessageId(messageId));
+      JSONObject message = fullMessageList.get(i);
+      Tuple tuple = mock(Tuple.class);
+      when(tuple.getValueByField("message")).thenReturn(message);
+      tupleList.add(tuple);
+      messageList.add(new BulkMessage<>(messageId, message));
+    }
+  }
 
   @Test
   public void testSourceTypeMissing() throws Exception {
@@ -149,167 +151,149 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
     bulkMessageWriterBolt.execute(tuple);
     verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any());
     verify(outputCollector, times(1)).ack(tuple);
+    verify(outputCollector, times(1)).reportError(any(Throwable.class));
+    verifyNoMoreInteractions(outputCollector);
   }
 
   @Test
   public void testFlushOnBatchSize() throws Exception {
-    BulkMessageWriterBolt<IndexingConfigurations> bulkMessageWriterBolt = new BulkMessageWriterBolt<IndexingConfigurations>(
-        "zookeeperUrl", "INDEXING")
-        .withBulkMessageWriter(bulkMessageWriter)
-        .withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
-        .withMessageGetterField("message");
+    Map stormConf = new HashMap();
     bulkMessageWriterBolt.setCuratorFramework(client);
     bulkMessageWriterBolt.setZKCache(cache);
     bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType,
             new FileInputStream(sampleSensorIndexingConfigPath));
-    bulkMessageWriterBolt.declareOutputFields(declarer);
-    verify(declarer, times(1)).declareStream(eq("error"), argThat(
-            new FieldsMatcher("message")));
-    Map stormConf = new HashMap();
-    doThrow(new Exception()).when(bulkMessageWriter).init(eq(stormConf),any(TopologyContext.class), any(WriterConfiguration.class));
-    try {
+    {
+      doThrow(new Exception()).when(bulkMessageWriter).init(eq(stormConf),any(TopologyContext.class), any(WriterConfiguration.class));
+      try {
+        bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector);
+        fail("A runtime exception should be thrown when bulkMessageWriter.init throws an exception");
+      } catch(RuntimeException e) {}
+      reset(bulkMessageWriter);
+    }
+    {
+      when(bulkMessageWriter.getName()).thenReturn("hdfs");
       bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector);
-      fail("A runtime exception should be thrown when bulkMessageWriter.init throws an exception");
-    } catch(RuntimeException e) {}
-    reset(bulkMessageWriter);
-    when(bulkMessageWriter.getName()).thenReturn("hdfs");
-    bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector);
-    verify(bulkMessageWriter, times(1)).init(eq(stormConf),any(TopologyContext.class), any(WriterConfiguration.class));
-    tupleList = new ArrayList<>();
-    messageList = new ArrayList<>();
-    for(int i = 0; i < 4; i++) {
-      when(tuple.getValueByField("message")).thenReturn(fullMessageList.get(i));
-      tupleList.add(tuple);
-      messageList.add(fullMessageList.get(i));
-      bulkMessageWriterBolt.execute(tuple);
-      verify(bulkMessageWriter, times(0)).write(eq(sensorType)
-              , any(WriterConfiguration.class), eq(tupleList), eq(messageList));
+      verify(bulkMessageWriter, times(1)).init(eq(stormConf),any(TopologyContext.class), any(WriterConfiguration.class));
     }
-    when(tuple.getValueByField("message")).thenReturn(fullMessageList.get(4));
-    tupleList.add(tuple);
-    messageList.add(fullMessageList.get(4));
-    BulkWriterResponse response = new BulkWriterResponse();
-    response.addAllSuccesses(tupleList);
-    when(bulkMessageWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(tupleList)
-            , argThat(new MessageListMatcher(messageList)))).thenReturn(response);
-    bulkMessageWriterBolt.execute(tuple);
-    verify(bulkMessageWriter, times(1)).write(eq(sensorType)
-            , any(WriterConfiguration.class), eq(tupleList)
-            , argThat(new MessageListMatcher(messageList)));
-    verify(outputCollector, times(5)).ack(tuple);
-    reset(outputCollector);
-    doThrow(new Exception()).when(bulkMessageWriter).write(eq(sensorType), any(WriterConfiguration.class)
-            , Matchers.anyListOf(Tuple.class), Matchers.anyListOf(JSONObject.class));
-    when(tuple.getValueByField("message")).thenReturn(fullMessageList.get(0));
-    UnitTestHelper.setLog4jLevel(BulkWriterComponent.class, Level.FATAL);
-    for(int i = 0; i < 5; i++) {
-      bulkMessageWriterBolt.execute(tuple);
+    {
+      for(int i = 0; i < 4; i++) {
+        bulkMessageWriterBolt.execute(tupleList.get(i));
+        verify(bulkMessageWriter, times(0)).write(eq(sensorType)
+                , any(WriterConfiguration.class), anyList());
+      }
+      BulkWriterResponse response = new BulkWriterResponse();
+      response.addAllSuccesses(messageIdList);
+      when(bulkMessageWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(messageList))).thenReturn(response);
+      bulkMessageWriterBolt.execute(tupleList.get(4));
+      verify(bulkMessageWriter, times(1)).write(eq(sensorType)
+              , any(WriterConfiguration.class), eq(messageList));
+      tupleList.forEach(tuple -> verify(outputCollector, times(1)).ack(tuple));
+      reset(outputCollector);
     }
-    UnitTestHelper.setLog4jLevel(BulkWriterComponent.class, Level.ERROR);
-    verify(outputCollector, times(5)).ack(tuple);
-    verify(outputCollector, times(5)).emit(eq(Constants.ERROR_STREAM), any(Values.class));
-    verify(outputCollector, times(1)).reportError(any(Throwable.class));
+    {
+      doThrow(new Exception()).when(bulkMessageWriter).write(eq(sensorType), any(WriterConfiguration.class)
+              , anyList());
+      UnitTestHelper.setLog4jLevel(BulkWriterComponent.class, Level.FATAL);
+      for(int i = 0; i < 5; i++) {
+        bulkMessageWriterBolt.execute(tupleList.get(i));
+      }
+      UnitTestHelper.setLog4jLevel(BulkWriterComponent.class, Level.ERROR);
+      tupleList.forEach(tuple -> verify(outputCollector, times(1)).ack(tuple));
+      verify(outputCollector, times(5)).emit(eq(Constants.ERROR_STREAM), any(Values.class));
+      verify(outputCollector, times(1)).reportError(any(Throwable.class));
+    }
+    verifyNoMoreInteractions(outputCollector);
   }
 
   @Test
   public void testFlushOnBatchTimeout() throws Exception {
     FakeClock clock = new FakeClock();
-    BulkMessageWriterBolt<IndexingConfigurations> bulkMessageWriterBolt = new BulkMessageWriterBolt<IndexingConfigurations>(
-        "zookeeperUrl", "INDEXING")
-        .withBulkMessageWriter(bulkMessageWriter)
-        .withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
-        .withMessageGetterField("message")
-        .withBatchTimeoutDivisor(3);
+    bulkMessageWriterBolt = bulkMessageWriterBolt.withBatchTimeoutDivisor(3);
     bulkMessageWriterBolt.setCuratorFramework(client);
     bulkMessageWriterBolt.setZKCache(cache);
     bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType,
             new FileInputStream(sampleSensorIndexingConfigPath));
-    bulkMessageWriterBolt.declareOutputFields(declarer);
-    verify(declarer, times(1)).declareStream(eq("error")
-            , argThat(new FieldsMatcher("message")));
-    Map stormConf = new HashMap();
-    when(bulkMessageWriter.getName()).thenReturn("elasticsearch");
-    bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector, clock);
-    verify(bulkMessageWriter, times(1)).init(eq(stormConf),any(TopologyContext.class), any(WriterConfiguration.class));
-    int batchTimeout = bulkMessageWriterBolt.getDefaultBatchTimeout();
-    assertEquals(4, batchTimeout);
-    tupleList = new ArrayList<>();
-    messageList = new ArrayList<>();
-    for(int i = 0; i < 3; i++) {
-      when(tuple.getValueByField("message")).thenReturn(fullMessageList.get(i));
-      tupleList.add(tuple);
-      messageList.add(fullMessageList.get(i));
-      bulkMessageWriterBolt.execute(tuple);
-      verify(bulkMessageWriter, times(0)).write(eq(sensorType)
-              , any(WriterConfiguration.class), eq(tupleList), eq(messageList));
+    {
+      bulkMessageWriterBolt.declareOutputFields(declarer);
+      verify(declarer, times(1)).declareStream(eq("error")
+              , argThat(new FieldsMatcher("message")));
     }
-    clock.elapseSeconds(5);
-    when(tuple.getValueByField("message")).thenReturn(fullMessageList.get(3));
-    tupleList.add(tuple);
-    messageList.add(fullMessageList.get(3));
-    BulkWriterResponse response = new BulkWriterResponse();
-    response.addAllSuccesses(tupleList);
-    when(bulkMessageWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(tupleList)
-            , argThat(new MessageListMatcher(messageList)))).thenReturn(response);
-    bulkMessageWriterBolt.execute(tuple);
-    verify(bulkMessageWriter, times(1)).write(eq(sensorType)
-            , any(WriterConfiguration.class)
-            , eq(tupleList), argThat(new MessageListMatcher(messageList)));
-    verify(outputCollector, times(4)).ack(tuple);
+    {
+      Map stormConf = new HashMap();
+      when(bulkMessageWriter.getName()).thenReturn("elasticsearch");
+      bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector, clock);
+      verify(bulkMessageWriter, times(1)).init(eq(stormConf),any(TopologyContext.class), any(WriterConfiguration.class));
+    }
+    {
+      int batchTimeout = bulkMessageWriterBolt.getMaxBatchTimeout();
+      assertEquals(4, batchTimeout);
+      for(int i = 0; i < 4; i++) {
+        bulkMessageWriterBolt.execute(tupleList.get(i));
+        verify(bulkMessageWriter, times(0)).write(eq(sensorType)
+                , any(WriterConfiguration.class), any(List.class));
+      }
+      clock.elapseSeconds(5);
+      BulkWriterResponse response = new BulkWriterResponse();
+      response.addAllSuccesses(messageIdList);
+
+      when(bulkMessageWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(messageList))).thenReturn(response);
+      bulkMessageWriterBolt.execute(tupleList.get(4));
+      verify(bulkMessageWriter, times(1)).write(eq(sensorType), any(WriterConfiguration.class), eq(messageList));
+      tupleList.forEach(tuple -> verify(outputCollector, times(1)).ack(tuple));
+    }
+    verifyNoMoreInteractions(outputCollector);
   }
 
   @Test
   public void testFlushOnTickTuple() throws Exception {
     FakeClock clock = new FakeClock();
-    BulkMessageWriterBolt<IndexingConfigurations> bulkMessageWriterBolt = new BulkMessageWriterBolt<IndexingConfigurations>(
-        "zookeeperUrl", "INDEXING")
-        .withBulkMessageWriter(bulkMessageWriter)
-        .withMessageGetter(MessageGetters.JSON_FROM_FIELD.name())
-        .withMessageGetterField("message");
     bulkMessageWriterBolt.setCuratorFramework(client);
     bulkMessageWriterBolt.setZKCache(cache);
     bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType
             , new FileInputStream(sampleSensorIndexingConfigPath));
-    bulkMessageWriterBolt.declareOutputFields(declarer);
-    verify(declarer, times(1)).declareStream(eq("error")
-            , argThat(new FieldsMatcher("message")));
-    Map stormConf = new HashMap();
-    when(bulkMessageWriter.getName()).thenReturn("elasticsearch");
-    bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector, clock);
-    verify(bulkMessageWriter, times(1)).init(eq(stormConf),any(TopologyContext.class)
-            , any(WriterConfiguration.class));
-    int batchTimeout = bulkMessageWriterBolt.getDefaultBatchTimeout();
-    assertEquals(14, batchTimeout);
-    tupleList = new ArrayList<>();
-    messageList = new ArrayList<>();
-    for(int i = 0; i < 3; i++) {
-      when(tuple.getValueByField("message")).thenReturn(fullMessageList.get(i));
-      tupleList.add(tuple);
-      messageList.add(fullMessageList.get(i));
-      bulkMessageWriterBolt.execute(tuple);
+    {
+      bulkMessageWriterBolt.declareOutputFields(declarer);
+      verify(declarer, times(1)).declareStream(eq("error")
+              , argThat(new FieldsMatcher("message")));
+    }
+    {
+      Map stormConf = new HashMap();
+      when(bulkMessageWriter.getName()).thenReturn("elasticsearch");
+      bulkMessageWriterBolt.prepare(stormConf, topologyContext, outputCollector, clock);
+      verify(bulkMessageWriter, times(1)).init(eq(stormConf),any(TopologyContext.class)
+              , any(WriterConfiguration.class));
+    }
+    {
+      int batchTimeout = bulkMessageWriterBolt.getMaxBatchTimeout();
+      assertEquals(14, batchTimeout);
+      for(int i = 0; i < 5; i++) {
+        bulkMessageWriterBolt.execute(tupleList.get(i));
+        verify(bulkMessageWriter, times(0)).write(eq(sensorType)
+                , any(WriterConfiguration.class), any());
+      }
+      Tuple tickTuple = mock(Tuple.class);
+      when(tickTuple.getValueByField("message")).thenReturn(null);
+      when(tickTuple.getSourceComponent()).thenReturn("__system"); //mark the tuple as a TickTuple, part 1 of 2
+      when(tickTuple.getSourceStreamId()).thenReturn("__tick");    //mark the tuple as a TickTuple, part 2 of 2
+      BulkWriterResponse response = new BulkWriterResponse();
+      response.addAllSuccesses(messageIdList);
+      when(bulkMessageWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(messageList))).thenReturn(response);
+      clock.advanceToSeconds(2);
+      bulkMessageWriterBolt.execute(tickTuple);
       verify(bulkMessageWriter, times(0)).write(eq(sensorType)
-              , any(WriterConfiguration.class), eq(tupleList), eq(messageList));
+              , any(WriterConfiguration.class)
+              , eq(messageList));
+      verify(outputCollector, times(1)).ack(tickTuple);  // 1 tick
+      clock.advanceToSeconds(9);
+      bulkMessageWriterBolt.execute(tickTuple);
+      verify(bulkMessageWriter, times(1)).write(eq(sensorType)
+              , any(WriterConfiguration.class)
+              , eq(messageList));
+      assertEquals(5, tupleList.size());
+      tupleList.forEach(tuple -> verify(outputCollector, times(1)).ack(tuple));
+      verify(outputCollector, times(2)).ack(tickTuple);
     }
-    when(tuple.getValueByField("message")).thenReturn(null);
-    when(tuple.getSourceComponent()).thenReturn("__system"); //mark the tuple as a TickTuple, part 1 of 2
-    when(tuple.getSourceStreamId()).thenReturn("__tick");    //mark the tuple as a TickTuple, part 2 of 2
-    BulkWriterResponse response = new BulkWriterResponse();
-    response.addAllSuccesses(tupleList);
-    when(bulkMessageWriter.write(eq(sensorType), any(WriterConfiguration.class), eq(tupleList)
-            , argThat(new MessageListMatcher(messageList)))).thenReturn(response);
-    clock.advanceToSeconds(2);
-    bulkMessageWriterBolt.execute(tuple);
-    verify(bulkMessageWriter, times(0)).write(eq(sensorType)
-            , any(WriterConfiguration.class)
-            , eq(tupleList), argThat(new MessageListMatcher(messageList)));
-    verify(outputCollector, times(1)).ack(tuple);  // 1 tick
-    clock.advanceToSeconds(9);
-    bulkMessageWriterBolt.execute(tuple);
-    verify(bulkMessageWriter, times(1)).write(eq(sensorType)
-            , any(WriterConfiguration.class)
-            , eq(tupleList), argThat(new MessageListMatcher(messageList)));
-    assertEquals(3, tupleList.size());
-    verify(outputCollector, times(5)).ack(tuple);  // 3 messages + 2nd tick
+    verifyNoMoreInteractions(outputCollector);
   }
 
   /**
@@ -322,7 +306,7 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
 
     // setup the bolt
     BulkMessageWriterBolt<IndexingConfigurations> bolt = new BulkMessageWriterBolt<IndexingConfigurations>(
-        "zookeeperUrl", "INDEXING")
+            "zookeeperUrl", "INDEXING")
             .withBulkMessageWriter(bulkMessageWriter)
             .withMessageGetter(MessageGetters.JSON_FROM_POSITION.name())
             .withMessageGetterField("message");
@@ -343,6 +327,18 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest {
     // the tuple should be handled as an error and ack'd
     verify(outputCollector, times(1)).emit(eq(Constants.ERROR_STREAM), any());
     verify(outputCollector, times(1)).ack(tuple);
+    verify(outputCollector, times(1)).reportError(any(Throwable.class));
+    verifyNoMoreInteractions(outputCollector);
+  }
+
+  @Test
+  public void testDeclareOutputFields() {
+    BulkMessageWriterBolt<IndexingConfigurations> bulkMessageWriterBolt = new BulkMessageWriterBolt<IndexingConfigurations>(
+            "zookeeperUrl", "INDEXING");
+
+    bulkMessageWriterBolt.declareOutputFields(declarer);
+    verify(declarer, times(1)).declareStream(eq("error")
+            , argThat(new FieldsMatcher("message")));
   }
 
 }
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
index 09ecafc..1d71c8a 100644
--- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
@@ -29,12 +29,11 @@ import java.util.Map;
 import org.apache.metron.common.configuration.IndexingConfigurations;
 import org.apache.metron.common.configuration.writer.IndexingWriterConfiguration;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.writer.BulkMessage;
 import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
 import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
-import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
 import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
 import org.json.simple.JSONObject;
 import org.junit.Assert;
 import org.junit.Before;
@@ -250,13 +249,12 @@ public class HdfsWriterTest {
     JSONObject message2 = new JSONObject();
     message2.put("test.key", "test.value3");
     message2.put("test.key2", "test.value2");
-    ArrayList<JSONObject> messages = new ArrayList<>();
-    messages.add(message);
-    messages.add(message2);
+    List<BulkMessage<JSONObject>> messages = new ArrayList<BulkMessage<JSONObject>>() {{
+      add(new BulkMessage("message1", message));
+      add(new BulkMessage("message2", message2));
+    }};
 
-    ArrayList<Tuple> tuples = new ArrayList<>();
-
-    writer.write(SENSOR_NAME, config, tuples, messages);
+    writer.write(SENSOR_NAME, config, messages);
     writer.close();
 
     ArrayList<String> expected = new ArrayList<>();
@@ -295,13 +293,12 @@ public class HdfsWriterTest {
     JSONObject message2 = new JSONObject();
     message2.put("test.key", "test.value");
     message2.put("test.key3", "test.value2");
-    ArrayList<JSONObject> messages = new ArrayList<>();
-    messages.add(message);
-    messages.add(message2);
-
-    ArrayList<Tuple> tuples = new ArrayList<>();
+    List<BulkMessage<JSONObject>> messages = new ArrayList<BulkMessage<JSONObject>>() {{
+      add(new BulkMessage<>("message1", message));
+      add(new BulkMessage<>("message2", message2));
+    }};
 
-    writer.write(SENSOR_NAME, config, tuples, messages);
+    writer.write(SENSOR_NAME, config, messages);
     writer.close();
 
     ArrayList<String> expected = new ArrayList<>();
@@ -339,13 +336,12 @@ public class HdfsWriterTest {
     JSONObject message2 = new JSONObject();
     message2.put("test.key", "test.value2");
     message2.put("test.key3", "test.value3");
-    ArrayList<JSONObject> messages = new ArrayList<>();
-    messages.add(message);
-    messages.add(message2);
+    List<BulkMessage<JSONObject>> messages = new ArrayList<BulkMessage<JSONObject>>() {{
+      add(new BulkMessage("message1", message));
+      add(new BulkMessage("message2", message2));
+    }};
 
-    ArrayList<Tuple> tuples = new ArrayList<>();
-
-    writer.write(SENSOR_NAME, config, tuples, messages);
+    writer.write(SENSOR_NAME, config, messages);
     writer.close();
 
     ArrayList<String> expected1 = new ArrayList<>();
@@ -392,12 +388,11 @@ public class HdfsWriterTest {
     // These two messages will be routed to the same folder, because test.key is the same
     JSONObject message = new JSONObject();
     message.put("test.key2", "test.value2");
-    ArrayList<JSONObject> messages = new ArrayList<>();
-    messages.add(message);
-
-    ArrayList<Tuple> tuples = new ArrayList<>();
+    List<BulkMessage<JSONObject>> messages = new ArrayList<BulkMessage<JSONObject>>() {{
+      add(new BulkMessage("message1", message));
+    }};
 
-    writer.write(SENSOR_NAME, config, tuples, messages);
+    writer.write(SENSOR_NAME, config,messages);
     writer.close();
 
     ArrayList<String> expected = new ArrayList<>();
@@ -425,15 +420,15 @@ public class HdfsWriterTest {
 
     JSONObject message = new JSONObject();
     message.put("test.key", "test.value");
-    ArrayList<JSONObject> messages = new ArrayList<>();
-    messages.add(message);
-    ArrayList<Tuple> tuples = new ArrayList<>();
+    List<BulkMessage<JSONObject>> messages = new ArrayList<BulkMessage<JSONObject>>() {{
+      add(new BulkMessage("message1", message));
+    }};
 
     CountSyncPolicy basePolicy = new CountSyncPolicy(5);
     ClonedSyncPolicyCreator creator = new ClonedSyncPolicyCreator(basePolicy);
 
-    writer.write(SENSOR_NAME, config, tuples, messages);
-    writer.write(SENSOR_NAME, config, tuples, messages);
+    writer.write(SENSOR_NAME, config, messages);
+    writer.write(SENSOR_NAME, config, messages);
     writer.close();
 
     File outputFolder = new File(folder.getAbsolutePath() + "/test-test.value/test.value/");
diff --git a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java
index 9d201b8..cac3d0b 100644
--- a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java
+++ b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/kafka/KafkaWriterTest.java
@@ -19,20 +19,47 @@
 package org.apache.metron.writer.kafka;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.InterruptException;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.ParserConfigurations;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.common.configuration.writer.ParserWriterConfiguration;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.writer.BulkMessage;
+import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.common.writer.MessageId;
 import org.json.simple.JSONObject;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
 
 public class KafkaWriterTest {
 
+  @Mock
+  private KafkaProducer kafkaProducer;
 
   public static final String SENSOR_TYPE = "test";
   public WriterConfiguration createConfiguration(final Map<String, Object> parserConfig) {
@@ -45,6 +72,11 @@ public class KafkaWriterTest {
     return new ParserWriterConfiguration(configurations);
   }
 
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+  }
+
   @Test
   public void testHappyPathGlobalConfig() throws Exception {
     KafkaWriter writer = new KafkaWriter();
@@ -58,12 +90,12 @@ public class KafkaWriterTest {
 
     writer.configure(SENSOR_TYPE, configuration);
     Map<String, Object> producerConfigs = writer.createProducerConfigs();
-    Assert.assertEquals(producerConfigs.get("bootstrap.servers"), "localhost:6667");
-    Assert.assertEquals(producerConfigs.get("key.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
-    Assert.assertEquals(producerConfigs.get("value.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
-    Assert.assertEquals(producerConfigs.get("request.required.acks"), 1);
-    Assert.assertEquals(producerConfigs.get("key1"), 1);
-    Assert.assertEquals(producerConfigs.get("key2"), "value2");
+    assertEquals(producerConfigs.get("bootstrap.servers"), "localhost:6667");
+    assertEquals(producerConfigs.get("key.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
+    assertEquals(producerConfigs.get("value.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
+    assertEquals(producerConfigs.get("request.required.acks"), 1);
+    assertEquals(producerConfigs.get("key1"), 1);
+    assertEquals(producerConfigs.get("key2"), "value2");
   }
 
   @Test
@@ -80,12 +112,12 @@ public class KafkaWriterTest {
 
     writer.configure(SENSOR_TYPE, configuration);
     Map<String, Object> producerConfigs = writer.createProducerConfigs();
-    Assert.assertEquals(producerConfigs.get("bootstrap.servers"), "localhost:6667");
-    Assert.assertEquals(producerConfigs.get("key.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
-    Assert.assertEquals(producerConfigs.get("value.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
-    Assert.assertEquals(producerConfigs.get("request.required.acks"), 1);
-    Assert.assertEquals(producerConfigs.get("key1"), 1);
-    Assert.assertEquals(producerConfigs.get("key2"), "value2");
+    assertEquals(producerConfigs.get("bootstrap.servers"), "localhost:6667");
+    assertEquals(producerConfigs.get("key.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
+    assertEquals(producerConfigs.get("value.serializer"), "org.apache.kafka.common.serialization.StringSerializer");
+    assertEquals(producerConfigs.get("request.required.acks"), 1);
+    assertEquals(producerConfigs.get("key1"), 1);
+    assertEquals(producerConfigs.get("key2"), "value2");
   }
 
   @Test
@@ -101,7 +133,7 @@ public class KafkaWriterTest {
     );
 
     writer.configure(SENSOR_TYPE, configuration);
-    Assert.assertEquals( "metron"
+    assertEquals( "metron"
                        , writer.getKafkaTopic(new JSONObject() {{
                           put("kafka_topic", "metron");
                          }}).get()
@@ -122,7 +154,7 @@ public class KafkaWriterTest {
     );
 
     writer.configure(SENSOR_TYPE, configuration);
-    Assert.assertEquals( "metron"
+    assertEquals( "metron"
                        , writer.getKafkaTopic(new JSONObject() {{
                           put("kafka_topic", "metron");
                          }}).get()
@@ -141,11 +173,83 @@ public class KafkaWriterTest {
     );
 
     writer.configure(SENSOR_TYPE, configuration);
-    Assert.assertEquals(Constants.ENRICHMENT_TOPIC
+    assertEquals(Constants.ENRICHMENT_TOPIC
                        , writer.getKafkaTopic(new JSONObject() {{
                           put("kafka_topic", "metron");
                          }}).get()
                        );
     Assert.assertTrue( writer.getKafkaTopic(new JSONObject()).isPresent() );
   }
+
+  @Test
+  public void testWriterShouldReturnResponse() throws Exception {
+    KafkaWriter writer = spy(new KafkaWriter());
+    writer.setKafkaProducer(kafkaProducer);
+
+    List<BulkMessage<JSONObject>> messages = new ArrayList<>();
+    JSONObject successMessage = new JSONObject();
+    successMessage.put("value", "success");
+    JSONObject errorMessage = new JSONObject();
+    errorMessage.put("value", "error");
+    JSONObject droppedMessage = new JSONObject();
+    droppedMessage.put("value", "dropped");
+    messages.add(new BulkMessage<>("successId", successMessage));
+    messages.add(new BulkMessage<>("errorId", errorMessage));
+    messages.add(new BulkMessage<>("droppedId", droppedMessage));
+
+    doReturn(Optional.of("successTopic")).when(writer).getKafkaTopic(successMessage);
+    doReturn(Optional.of("errorTopic")).when(writer).getKafkaTopic(errorMessage);
+    doReturn(Optional.empty()).when(writer).getKafkaTopic(droppedMessage);
+
+    Future successFuture = mock(Future.class);
+    Future errorFuture = mock(Future.class);
+    ExecutionException throwable = new ExecutionException(new Exception("kafka error"));
+    when(kafkaProducer.send(new ProducerRecord<String, String>("errorTopic", "{\"value\":\"error\"}"))).thenReturn(errorFuture);
+    when(kafkaProducer.send(new ProducerRecord<String, String>("successTopic", "{\"value\":\"success\"}"))).thenReturn(successFuture);
+
+    when(errorFuture.get()).thenThrow(throwable);
+
+    BulkWriterResponse response = new BulkWriterResponse();
+    response.addSuccess(new MessageId("successId"));
+    response.addError(throwable, new MessageId("errorId"));
+
+    assertEquals(response, writer.write(SENSOR_TYPE, createConfiguration(new HashMap<>()), messages));
+    verify(kafkaProducer, times(1)).flush();
+    verify(kafkaProducer, times(1)).send(new ProducerRecord<String, String>("successTopic", "{\"value\":\"success\"}"));
+    verify(kafkaProducer, times(1)).send(new ProducerRecord<String, String>("errorTopic", "{\"value\":\"error\"}"));
+    verifyNoMoreInteractions(kafkaProducer);
+  }
+
+  @Test
+  public void testWriteShouldReturnErrorsOnFailedFlush() throws Exception {
+    KafkaWriter writer = spy(new KafkaWriter());
+    writer.setKafkaProducer(kafkaProducer);
+
+    List<BulkMessage<JSONObject>> messages = new ArrayList<>();
+    JSONObject message1 = new JSONObject();
+    message1.put("value", "message1");
+    JSONObject message2 = new JSONObject();
+    message2.put("value", "message2");
+    messages.add(new BulkMessage<>("messageId1", message1));
+    messages.add(new BulkMessage<>("messageId2", message2));
+
+    doReturn(Optional.of("topic1")).when(writer).getKafkaTopic(message1);
+    doReturn(Optional.of("topic2")).when(writer).getKafkaTopic(message2);
+
+    Future future1 = mock(Future.class);
+    Future future2 = mock(Future.class);
+    when(kafkaProducer.send(new ProducerRecord<String, String>("topic1", "{\"value\":\"message1\"}"))).thenReturn(future1);
+    when(kafkaProducer.send(new ProducerRecord<String, String>("topic2", "{\"value\":\"message2\"}"))).thenReturn(future2);
+    InterruptException throwable = new InterruptException("kafka flush exception");
+    doThrow(throwable).when(kafkaProducer).flush();
+
+    BulkWriterResponse response = new BulkWriterResponse();
+    response.addAllErrors(throwable, Arrays.asList(new MessageId("messageId1"), new MessageId("messageId2")));
+
+    assertEquals(response, writer.write(SENSOR_TYPE, createConfiguration(new HashMap<>()), messages));
+    verify(kafkaProducer, times(1)).flush();
+    verify(kafkaProducer, times(1)).send(new ProducerRecord<String, String>("topic1", "{\"value\":\"message1\"}"));
+    verify(kafkaProducer, times(1)).send(new ProducerRecord<String, String>("topic2", "{\"value\":\"message2\"}"));
+    verifyNoMoreInteractions(kafkaProducer);
+  }
 }


Mime
View raw message