metron-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nickal...@apache.org
Subject [metron] branch master updated: METRON-1849 Elasticsearch Index Write Functionality Should be Shared (nickwallen) closes apache/metron#1254
Date Tue, 11 Dec 2018 20:24:25 GMT
This is an automated email from the ASF dual-hosted git repository.

nickallen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/master by this push:
     new ec3b98f  METRON-1849 Elasticsearch Index Write Functionality Should be Shared (nickwallen) closes apache/metron#1254
ec3b98f is described below

commit ec3b98f762ce3726ed9a33abdb446957d1865dca
Author: nickwallen <nick@nickallen.org>
AuthorDate: Tue Dec 11 12:59:08 2018 -0500

    METRON-1849 Elasticsearch Index Write Functionality Should be Shared (nickwallen) closes apache/metron#1254
---
 .../elasticsearch/bulk/BulkDocumentWriter.java     |  45 +++
 .../bulk/BulkDocumentWriterResults.java            |  68 ++++
 .../bulk/ElasticsearchBulkDocumentWriter.java      | 166 ++++++++++
 .../metron/elasticsearch/bulk/WriteFailure.java    |  48 +++
 .../metron/elasticsearch/bulk/WriteSuccess.java    |  36 +++
 .../metron/elasticsearch/dao/ElasticsearchDao.java |   2 +-
 .../elasticsearch/dao/ElasticsearchUpdateDao.java  | 144 ++++-----
 .../elasticsearch/writer/ElasticsearchWriter.java  | 157 +++++----
 .../elasticsearch/writer/TupleBasedDocument.java   |  44 +++
 .../bulk/ElasticsearchBulkDocumentWriterTest.java  | 178 ++++++++++
 .../components/ElasticSearchComponent.java         |  46 +--
 .../writer/ElasticsearchWriterTest.java            | 360 ++++++++++++++-------
 .../src/test/resources/log4j.properties            |   0
 13 files changed, 994 insertions(+), 300 deletions(-)

diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriter.java
new file mode 100644
index 0000000..34f543e
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriter.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.bulk;
+
+import org.apache.metron.indexing.dao.update.Document;
+
+/**
+ * Writes documents to an index in bulk.
+ *
+ * @param <D> The type of document to write.
+ */
+public interface BulkDocumentWriter<D extends Document> {
+
+    /**
+     * Add a document to the batch.
+     * @param document The document to write.
+     * @param index The name of the index to write to.
+     */
+    void addDocument(D document, String index);
+
+    /**
+     * @return The number of documents waiting to be written.
+     */
+    int size();
+
+    /**
+     * Write all documents in the batch.
+     */
+    BulkDocumentWriterResults<D> write();
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriterResults.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriterResults.java
new file mode 100644
index 0000000..90e5ce3
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriterResults.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.bulk;
+
+import org.apache.metron.indexing.dao.update.Document;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The result of writing documents in bulk using a {@link BulkDocumentWriter}.
+ * @param <D> The type of documents to write.
+ */
+public class BulkDocumentWriterResults<D extends Document> {
+
+  private List<WriteSuccess<D>> successes;
+  private List<WriteFailure<D>> failures;
+
+  public BulkDocumentWriterResults() {
+    this.successes = new ArrayList<>();
+    this.failures = new ArrayList<>();
+  }
+
+  public void add(WriteSuccess<D> success) {
+    this.successes.add(success);
+  }
+
+  public void addSuccess(D success) {
+    add(new WriteSuccess<D>(success));
+  }
+
+  public void addSuccesses(List<D> successes) {
+    for(D success: successes) {
+      addSuccess(success);
+    }
+  }
+
+  public List<WriteSuccess<D>> getSuccesses() {
+    return successes;
+  }
+
+  public void add(WriteFailure<D> failure) {
+    this.failures.add(failure);
+  }
+
+  public void addFailure(D document, Throwable cause, String message) {
+    add(new WriteFailure(document, cause, message));
+  }
+
+  public List<WriteFailure<D>> getFailures() {
+    return failures;
+  }
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java
new file mode 100644
index 0000000..9e6e568
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.bulk;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.WriteRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Writes documents to an Elasticsearch index in bulk.
+ *
+ * @param <D> The type of document to write.
+ */
+public class ElasticsearchBulkDocumentWriter<D extends Document> implements BulkDocumentWriter<D> {
+
+    /**
+     * A {@link Document} along with the index it will be written to.
+     */
+    private class Indexable {
+        D document;
+        String index;
+
+        public Indexable(D document, String index) {
+            this.document = document;
+            this.index = index;
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    private ElasticsearchClient client;
+    private List<Indexable> documents;
+    private WriteRequest.RefreshPolicy refreshPolicy;
+
+    public ElasticsearchBulkDocumentWriter(ElasticsearchClient client) {
+        this.client = client;
+        this.documents = new ArrayList<>();
+        this.refreshPolicy = WriteRequest.RefreshPolicy.NONE;
+    }
+
+    @Override
+    public void addDocument(D document, String indexName) {
+        documents.add(new Indexable(document, indexName));
+        LOG.debug("Adding document to batch; document={}, index={}", document, indexName);
+    }
+
+    @Override
+    public BulkDocumentWriterResults<D> write() {
+        BulkDocumentWriterResults<D> results = new BulkDocumentWriterResults<>();
+        try {
+            // create an index request for each document
+            BulkRequest bulkRequest = new BulkRequest();
+            bulkRequest.setRefreshPolicy(refreshPolicy);
+            for(Indexable doc: documents) {
+                DocWriteRequest request = createRequest(doc.document, doc.index);
+                bulkRequest.add(request);
+            }
+
+            // submit the request and handle the response
+            BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequest);
+            handleBulkResponse(bulkResponse, documents, results);
+
+        } catch(IOException e) {
+            // assume all documents have failed
+            for(Indexable indexable: documents) {
+                D failed = indexable.document;
+                results.addFailure(failed, e, ExceptionUtils.getRootCauseMessage(e));
+            }
+            LOG.error("Failed to submit bulk request; all documents failed", e);
+
+        } finally {
+            // flush all documents no matter which ones succeeded or failed
+            documents.clear();
+        }
+
+        LOG.debug("Wrote document(s) to Elasticsearch; batchSize={}, success={}, failed={}",
+                documents.size(), results.getSuccesses().size(), results.getFailures().size());
+        return results;
+    }
+
+    @Override
+    public int size() {
+        return documents.size();
+    }
+
+    public ElasticsearchBulkDocumentWriter<D> withRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) {
+        this.refreshPolicy = refreshPolicy;
+        return this;
+    }
+
+    private IndexRequest createRequest(D document, String index) {
+        if(document.getTimestamp() == null) {
+            throw new IllegalArgumentException("Document must contain the timestamp");
+        }
+        return new IndexRequest()
+                .source(document.getDocument())
+                .type(document.getSensorType() + "_doc")
+                .id(document.getGuid())
+                .index(index)
+                .timestamp(document.getTimestamp().toString());
+    }
+
+    /**
+     * Handles the {@link BulkResponse} received from Elasticsearch.
+     * @param bulkResponse The response received from Elasticsearch.
+     * @param documents The documents included in the bulk request.
+     * @param results The writer results.
+     */
+    private void handleBulkResponse(BulkResponse bulkResponse, List<Indexable> documents, BulkDocumentWriterResults<D> results) {
+        if (bulkResponse.hasFailures()) {
+
+            // interrogate the response to distinguish between those that succeeded and those that failed
+            for(BulkItemResponse response: bulkResponse) {
+                if(response.isFailed()) {
+                    // request failed
+                    D failed = getDocument(response.getItemId());
+                    Exception cause = response.getFailure().getCause();
+                    String message = response.getFailureMessage();
+                    results.addFailure(failed, cause, message);
+
+                } else {
+                    // request succeeded
+                    D success = getDocument(response.getItemId());
+                    results.addSuccess(success);
+                }
+            }
+        } else {
+            // all requests succeeded
+            for(Indexable success: documents) {
+                results.addSuccess(success.document);
+            }
+        }
+    }
+
+    private D getDocument(int index) {
+        return documents.get(index).document;
+    }
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteFailure.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteFailure.java
new file mode 100644
index 0000000..ac571c7
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteFailure.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.bulk;
+
+import org.apache.metron.indexing.dao.update.Document;
+
+/**
+ * Indicates that a document failed to be written by a {@link BulkDocumentWriter}.
+ * @param <D> The type of document that failed to write.
+ */
+public class WriteFailure <D extends Document> {
+  private D document;
+  private Throwable cause;
+  private String message;
+
+  public WriteFailure(D document, Throwable cause, String message) {
+    this.document = document;
+    this.cause = cause;
+    this.message = message;
+  }
+
+  public D getDocument() {
+    return document;
+  }
+
+  public Throwable getCause() {
+    return cause;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteSuccess.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteSuccess.java
new file mode 100644
index 0000000..a86325d
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteSuccess.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.bulk;
+
+import org.apache.metron.indexing.dao.update.Document;
+
+/**
+ * Indicates that a document was successfully written by a {@link BulkDocumentWriter}.
+ * @param <D> The type of document written.
+ */
+public class WriteSuccess <D extends Document> {
+  private D document;
+
+  public WriteSuccess(D document) {
+    this.document = document;
+  }
+
+  public D getDocument() {
+    return document;
+  }
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index 675d22f..7226c30 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -191,7 +191,7 @@ public class ElasticsearchDao implements IndexDao {
   }
 
   protected Optional<String> getIndexName(String guid, String sensorType) throws IOException {
-    return updateDao.getIndexName(guid, sensorType);
+    return updateDao.findIndexNameByGUID(guid, sensorType);
   }
 
   protected SearchResponse search(SearchRequest request, QueryBuilder queryBuilder)
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
index ba852aa..fa02f8d 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
@@ -17,18 +17,9 @@
  */
 package org.apache.metron.elasticsearch.dao;
 
-import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
+import org.apache.metron.elasticsearch.bulk.ElasticsearchBulkDocumentWriter;
+import org.apache.metron.elasticsearch.bulk.WriteFailure;
+import org.apache.metron.elasticsearch.bulk.BulkDocumentWriterResults;
 import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
@@ -36,84 +27,80 @@ import org.apache.metron.indexing.dao.search.AlertComment;
 import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
 import org.apache.metron.indexing.dao.update.Document;
 import org.apache.metron.indexing.dao.update.UpdateDao;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.support.WriteRequest;
-import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD;
+
+import static java.lang.String.format;
+
 public class ElasticsearchUpdateDao implements UpdateDao {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private transient ElasticsearchClient client;
   private AccessConfig accessConfig;
   private ElasticsearchRetrieveLatestDao retrieveLatestDao;
-  private WriteRequest.RefreshPolicy refreshPolicy;
+  private ElasticsearchBulkDocumentWriter<Document> documentWriter;
 
   public ElasticsearchUpdateDao(ElasticsearchClient client,
       AccessConfig accessConfig,
       ElasticsearchRetrieveLatestDao searchDao) {
-    this.client = client;
     this.accessConfig = accessConfig;
     this.retrieveLatestDao = searchDao;
-    this.refreshPolicy = WriteRequest.RefreshPolicy.NONE;
+    this.documentWriter = new ElasticsearchBulkDocumentWriter<>(client)
+            .withRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
   }
 
   @Override
   public Document update(Document update, Optional<String> index) throws IOException {
-    String indexPostfix = ElasticsearchUtils
-        .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date());
-    String sensorType = update.getSensorType();
-    String indexName = getIndexName(update, index, indexPostfix);
-
-    IndexRequest indexRequest = buildIndexRequest(update, sensorType, indexName);
-    try {
-      IndexResponse response = client.getHighLevelClient().index(indexRequest);
-
-      ShardInfo shardInfo = response.getShardInfo();
-      int failed = shardInfo.getFailed();
-      if (failed > 0) {
-        throw new IOException(
-            "ElasticsearchDao index failed: " + Arrays.toString(shardInfo.getFailures()));
-      }
-    } catch (Exception e) {
-      throw new IOException(e.getMessage(), e);
-    }
-    return update;
+    Map<Document, Optional<String>> updates = new HashMap<>();
+    updates.put(update, index);
+
+    Map<Document, Optional<String>> results = batchUpdate(updates);
+    return results.keySet().iterator().next();
   }
 
   @Override
   public Map<Document, Optional<String>> batchUpdate(Map<Document, Optional<String>> updates) throws IOException {
-    String indexPostfix = ElasticsearchUtils
-        .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date());
-
-    BulkRequest bulkRequestBuilder = new BulkRequest();
-    bulkRequestBuilder.setRefreshPolicy(refreshPolicy);
-
-    // Get the indices we'll actually be using for each Document.
-    for (Map.Entry<Document, Optional<String>> updateEntry : updates.entrySet()) {
-      Document update = updateEntry.getKey();
-      String sensorType = update.getSensorType();
-      String indexName = getIndexName(update, updateEntry.getValue(), indexPostfix);
-      IndexRequest indexRequest = buildIndexRequest(
-          update,
-          sensorType,
-          indexName
-      );
-
-      bulkRequestBuilder.add(indexRequest);
+    Map<String, Object> globalConfig = accessConfig.getGlobalConfigSupplier().get();
+    String indexPostfix = ElasticsearchUtils.getIndexFormat(globalConfig).format(new Date());
+
+    for (Map.Entry<Document, Optional<String>> entry : updates.entrySet()) {
+      Document document = entry.getKey();
+      Optional<String> optionalIndex = entry.getValue();
+      String indexName = optionalIndex.orElse(getIndexName(document, indexPostfix));
+      documentWriter.addDocument(document, indexName);
     }
 
-    BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequestBuilder);
-    if (bulkResponse.hasFailures()) {
-      LOG.error("Bulk Request has failures: {}", bulkResponse.buildFailureMessage());
-      throw new IOException(
-          "ElasticsearchDao upsert failed: " + bulkResponse.buildFailureMessage());
+    // write the documents. if any document fails, raise an exception.
+    BulkDocumentWriterResults<Document> results = documentWriter.write();
+    int failures = results.getFailures().size();
+    if(failures > 0) {
+      int successes = results.getSuccesses().size();
+      String msg = format("Failed to update all documents; %d successes, %d failures", successes, failures);
+      LOG.error(msg);
+
+      // log each individual failure
+      for(WriteFailure<Document> failure: results.getFailures()) {
+        LOG.error(failure.getMessage(), failure.getCause());
+      }
+
+      // raise an exception using the first exception as the root cause, although there may be many
+      Throwable cause = results.getFailures().get(0).getCause();
+      throw new IOException(msg, cause);
     }
+
     return updates;
   }
 
@@ -187,32 +174,19 @@ public class ElasticsearchUpdateDao implements UpdateDao {
   }
 
   public ElasticsearchUpdateDao withRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) {
-    this.refreshPolicy = refreshPolicy;
+    documentWriter.withRefreshPolicy(refreshPolicy);
     return this;
   }
 
-  protected String getIndexName(Document update, Optional<String> index, String indexPostFix) throws IOException {
-    return index.orElse(getIndexName(update.getGuid(), update.getSensorType())
-        .orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), indexPostFix, null))
-    );
+  protected String getIndexName(Document update, String indexPostFix) throws IOException {
+    return findIndexNameByGUID(update.getGuid(), update.getSensorType())
+            .orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), indexPostFix, null));
   }
 
-  protected Optional<String> getIndexName(String guid, String sensorType) throws IOException {
-    return retrieveLatestDao.searchByGuid(guid,
-        sensorType,
-        hit -> Optional.ofNullable(hit.getIndex())
-    );
-  }
-
-  protected IndexRequest buildIndexRequest(Document update, String sensorType, String indexName) {
-    String type = sensorType + "_doc";
-    Object ts = update.getTimestamp();
-    IndexRequest indexRequest = new IndexRequest(indexName, type, update.getGuid())
-        .source(update.getDocument());
-    if (ts != null) {
-      indexRequest = indexRequest.timestamp(ts.toString());
-    }
-
-    return indexRequest;
+  protected Optional<String> findIndexNameByGUID(String guid, String sensorType) throws IOException {
+    return retrieveLatestDao.searchByGuid(
+            guid,
+            sensorType,
+            hit -> Optional.ofNullable(hit.getIndex()));
   }
 }
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index fbdd4fe..a3459d8 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -17,21 +17,24 @@
  */
 package org.apache.metron.elasticsearch.writer;
 
+import com.google.common.collect.Lists;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.field.FieldNameConverter;
 import org.apache.metron.common.field.FieldNameConverters;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.elasticsearch.bulk.BulkDocumentWriter;
+import org.apache.metron.elasticsearch.bulk.ElasticsearchBulkDocumentWriter;
+import org.apache.metron.elasticsearch.bulk.WriteFailure;
+import org.apache.metron.elasticsearch.bulk.WriteSuccess;
+import org.apache.metron.elasticsearch.bulk.BulkDocumentWriterResults;
 import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,11 +42,14 @@ import org.slf4j.LoggerFactory;
 import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Date;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import static java.lang.String.format;
+import static org.apache.metron.stellar.common.Constants.Fields.TIMESTAMP;
+
 /**
  * A {@link BulkMessageWriter} that writes messages to Elasticsearch.
  */
@@ -57,89 +63,110 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
   private transient ElasticsearchClient client;
 
   /**
+   * Responsible for writing documents.
+   *
+   * <p>Uses a {@link TupleBasedDocument} to maintain the relationship between
+   * a {@link Tuple} and the document created from the contents of that tuple. If
+   * a document cannot be written, the associated tuple needs to be failed.
+   */
+  private transient BulkDocumentWriter<TupleBasedDocument> documentWriter;
+
+  /**
    * A simple data formatter used to build the appropriate Elasticsearch index name.
    */
   private SimpleDateFormat dateFormat;
 
-
   @Override
   public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) {
-
     Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
-    client = ElasticsearchClientFactory.create(globalConfiguration);
     dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration);
+
+    // only create the document writer, if one does not already exist. useful for testing.
+    if(documentWriter == null) {
+      client = ElasticsearchClientFactory.create(globalConfiguration);
+      documentWriter = new ElasticsearchBulkDocumentWriter<>(client);
+    }
   }
 
   @Override
-  public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception {
+  public BulkWriterResponse write(String sensorType,
+                                  WriterConfiguration configurations,
+                                  Iterable<Tuple> tuplesIter,
+                                  List<JSONObject> messages) {
 
     // fetch the field name converter for this sensor type
     FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations);
+    String indexPostfix = dateFormat.format(new Date());
+    String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
+
+    // the number of tuples must match the number of messages
+    List<Tuple> tuples = Lists.newArrayList(tuplesIter);
+    int batchSize = tuples.size();
+    if(messages.size() != batchSize) {
+      throw new IllegalStateException(format("Expect same number of tuples and messages; |tuples|=%d, |messages|=%d",
+              tuples.size(), messages.size()));
+    }
 
-    final String indexPostfix = dateFormat.format(new Date());
-    BulkRequest bulkRequest = new BulkRequest();
-    for(JSONObject message: messages) {
-
-      JSONObject esDoc = new JSONObject();
-      for(Object k : message.keySet()){
-        copyField(k.toString(), message, esDoc, fieldNameConverter);
-      }
-
-      String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
-      IndexRequest indexRequest = new IndexRequest(indexName, sensorType + "_doc");
-      indexRequest.source(esDoc.toJSONString());
-      String guid = (String)esDoc.get(Constants.GUID);
-      if(guid != null) {
-        indexRequest.id(guid);
-      }
-
-      Object ts = esDoc.get("timestamp");
-      if(ts != null) {
-        indexRequest.timestamp(ts.toString());
-      }
-      bulkRequest.add(indexRequest);
+    // create a document from each message
+    for(int i=0; i<tuples.size(); i++) {
+      JSONObject message = messages.get(i);
+      Tuple tuple = tuples.get(i);
+      TupleBasedDocument document = createDocument(message, tuple, sensorType, fieldNameConverter);
+      documentWriter.addDocument(document, indexName);
     }
 
-    BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequest);
-    return buildWriteReponse(tuples, bulkResponse);
-  }
+    // write the documents
+    BulkDocumentWriterResults<TupleBasedDocument> results = documentWriter.write();
 
-  @Override
-  public String getName() {
-    return "elasticsearch";
+    // build the response
+    BulkWriterResponse response = new BulkWriterResponse();
+    for(WriteSuccess<TupleBasedDocument> success: results.getSuccesses()) {
+      response.addSuccess(success.getDocument().getTuple());
+    }
+    for(WriteFailure<TupleBasedDocument> failure: results.getFailures()) {
+      response.addError(failure.getCause(), failure.getDocument().getTuple());
+    }
+    return response;
   }
 
-  protected BulkWriterResponse buildWriteReponse(Iterable<Tuple> tuples, BulkResponse bulkResponse) throws Exception {
-    // Elasticsearch responses are in the same order as the request, giving us an implicit mapping with Tuples
-    BulkWriterResponse writerResponse = new BulkWriterResponse();
-    if (bulkResponse.hasFailures()) {
-      Iterator<BulkItemResponse> respIter = bulkResponse.iterator();
-      Iterator<Tuple> tupleIter = tuples.iterator();
-      while (respIter.hasNext() && tupleIter.hasNext()) {
-        BulkItemResponse item = respIter.next();
-        Tuple tuple = tupleIter.next();
-
-        if (item.isFailed()) {
-          writerResponse.addError(item.getFailure().getCause(), tuple);
-        } else {
-          writerResponse.addSuccess(tuple);
-        }
-
-        // Should never happen, so fail the entire batch if it does.
-        if (respIter.hasNext() != tupleIter.hasNext()) {
-          throw new Exception(bulkResponse.buildFailureMessage());
-        }
-      }
+  private TupleBasedDocument createDocument(JSONObject message,
+                                            Tuple tuple,
+                                            String sensorType,
+                                            FieldNameConverter fieldNameConverter) {
+    // transform the message fields to the source fields of the indexed document
+    JSONObject source = new JSONObject();
+    for(Object k : message.keySet()){
+      copyField(k.toString(), message, source, fieldNameConverter);
+    }
+
+    // define the document id
+    String guid = ConversionUtils.convert(source.get(Constants.GUID), String.class);
+    if(guid == null) {
+      LOG.warn("Missing '{}' field; document ID will be auto-generated.", Constants.GUID);
+    }
+
+    // define the document timestamp
+    Long timestamp = null;
+    Object value = source.get(TIMESTAMP.getName());
+    if(value != null) {
+      timestamp = Long.parseLong(value.toString());
     } else {
-      writerResponse.addAllSuccesses(tuples);
+      LOG.warn("Missing '{}' field; timestamp will be set to system time.", TIMESTAMP.getName());
     }
 
-    return writerResponse;
+    return new TupleBasedDocument(source, guid, sensorType, timestamp, tuple);
+  }
+
+  @Override
+  public String getName() {
+    return "elasticsearch";
   }
 
   @Override
   public void close() throws Exception {
-    client.close();
+    if(client != null) {
+      client.close();
+    }
   }
 
   /**
@@ -167,5 +194,13 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
     // copy the field
     destination.put(destinationFieldName, source.get(sourceFieldName));
   }
+
+  /**
+   * Set the document writer.  Primarily used for testing.
+   * @param documentWriter The {@link BulkDocumentWriter} to use.
+   */
+  public void setDocumentWriter(BulkDocumentWriter<TupleBasedDocument> documentWriter) {
+    this.documentWriter = documentWriter;
+  }
 }
 
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/TupleBasedDocument.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/TupleBasedDocument.java
new file mode 100644
index 0000000..ba44937
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/TupleBasedDocument.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.writer;
+
+import org.apache.metron.indexing.dao.update.Document;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.Map;
+
+/**
+ * An {@link Document} that is created from the contents of a {@link Tuple}.
+ */
+public class TupleBasedDocument extends Document {
+
+    private Tuple tuple;
+
+    public TupleBasedDocument(Map<String, Object> document,
+                              String guid,
+                              String sensorType,
+                              Long timestamp,
+                              Tuple tuple) {
+        super(document, guid, sensorType, timestamp);
+        this.tuple = tuple;
+    }
+
+    public Tuple getTuple() {
+        return tuple;
+    }
+}
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java
new file mode 100644
index 0000000..b313811
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.bulk;
+
+import org.apache.metron.common.Constants;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ElasticsearchBulkDocumentWriterTest {
+
+    ElasticsearchBulkDocumentWriter<Document> writer;
+    ElasticsearchClient client;
+    RestHighLevelClient highLevelClient;
+
+    @Before
+    public void setup() {
+        // mock Elasticsearch
+        highLevelClient = mock(RestHighLevelClient.class);
+        client = mock(ElasticsearchClient.class);
+        when(client.getHighLevelClient()).thenReturn(highLevelClient);
+
+        writer = new ElasticsearchBulkDocumentWriter<>(client);
+    }
+
+    @Test
+    public void testWriteSuccess() throws IOException {
+        setupElasticsearchToSucceed();
+
+        // write a document successfully
+        Document doc = document(message());
+        String index = "bro_index";
+        writer.addDocument(doc, index);
+
+        BulkDocumentWriterResults<Document> results = writer.write();
+        assertEquals(1, results.getSuccesses().size());
+        assertEquals(0, results.getFailures().size());
+
+        WriteSuccess<Document> success = results.getSuccesses().get(0);
+        assertEquals(doc, success.getDocument());
+    }
+
+    @Test
+    public void testWriteFailure() throws IOException {
+        setupElasticsearchToFail();
+
+        // the document will fail to write
+        Document doc = document(message());
+        String index = "bro_index";
+        writer.addDocument(doc, index);
+
+        BulkDocumentWriterResults<Document> results = writer.write();
+        assertEquals(0, results.getSuccesses().size());
+        assertEquals(1, results.getFailures().size());
+
+        WriteFailure<Document> failure = results.getFailures().get(0);
+        assertEquals(doc, failure.getDocument());
+        assertEquals("error message", failure.getMessage());
+        assertNotNull(failure.getCause());
+    }
+
+    @Test
+    public void testSizeWhenWriteSuccessful() throws IOException {
+        setupElasticsearchToSucceed();
+        assertEquals(0, writer.size());
+
+        // add some documents to write
+        String index = "bro_index";
+        writer.addDocument(document(message()), index);
+        writer.addDocument(document(message()), index);
+        writer.addDocument(document(message()), index);
+        writer.addDocument(document(message()), index);
+        writer.addDocument(document(message()), index);
+        assertEquals(5, writer.size());
+
+        // after the write, all documents should have been flushed
+        writer.write();
+        assertEquals(0, writer.size());
+    }
+
+    @Test
+    public void testSizeWhenWriteFails() throws IOException {
+        setupElasticsearchToFail();
+        assertEquals(0, writer.size());
+
+        // add some documents to write
+        String index = "bro_index";
+        writer.addDocument(document(message()), index);
+        writer.addDocument(document(message()), index);
+        writer.addDocument(document(message()), index);
+        writer.addDocument(document(message()), index);
+        writer.addDocument(document(message()), index);
+        assertEquals(5, writer.size());
+
+        // after the write, all documents should have been flushed
+        writer.write();
+        assertEquals(0, writer.size());
+    }
+
+    private void setupElasticsearchToFail() throws IOException {
+        // define the item failure
+        BulkItemResponse.Failure failure = mock(BulkItemResponse.Failure.class);
+        when(failure.getCause()).thenReturn(new Exception("test exception"));
+        when(failure.getMessage()).thenReturn("error message");
+
+        // define the item level response
+        BulkItemResponse itemResponse = mock(BulkItemResponse.class);
+        when(itemResponse.isFailed()).thenReturn(true);
+        when(itemResponse.getItemId()).thenReturn(0);
+        when(itemResponse.getFailure()).thenReturn(failure);
+        when(itemResponse.getFailureMessage()).thenReturn("error message");
+        List<BulkItemResponse> itemsResponses = Collections.singletonList(itemResponse);
+
+        // define the bulk response to indicate failure
+        BulkResponse response = mock(BulkResponse.class);
+        when(response.iterator()).thenReturn(itemsResponses.iterator());
+        when(response.hasFailures()).thenReturn(true);
+
+        // have the client return the mock response
+        when(highLevelClient.bulk(any(BulkRequest.class))).thenReturn(response);
+    }
+
+    private void setupElasticsearchToSucceed() throws IOException {
+        // define the bulk response to indicate success
+        BulkResponse response = mock(BulkResponse.class);
+        when(response.hasFailures()).thenReturn(false);
+
+        // have the client return the mock response
+        when(highLevelClient.bulk(any(BulkRequest.class))).thenReturn(response);
+    }
+
+    private Document document(JSONObject message) {
+        String guid = UUID.randomUUID().toString();
+        String sensorType = "bro";
+        Long timestamp = System.currentTimeMillis();
+        return new Document(message, guid, sensorType, timestamp);
+    }
+
+    private JSONObject message() {
+        JSONObject message = new JSONObject();
+        message.put(Constants.GUID, UUID.randomUUID().toString());
+        message.put(Constants.Fields.TIMESTAMP.getName(), System.currentTimeMillis());
+        message.put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
+        return message;
+    }
+}
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
index 227f5ef..dfdf88e 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
@@ -17,40 +17,11 @@
  */
 package org.apache.metron.elasticsearch.integration.components;
 
-import static java.util.Arrays.asList;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BooleanSupplier;
-
-import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.io.FileUtils;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.elasticsearch.client.ElasticsearchClient;
-import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
-import org.apache.metron.elasticsearch.dao.ElasticsearchColumnMetadataDao;
 import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
-import org.apache.metron.elasticsearch.dao.ElasticsearchRequestSubmitter;
-import org.apache.metron.elasticsearch.dao.ElasticsearchRetrieveLatestDao;
-import org.apache.metron.elasticsearch.dao.ElasticsearchSearchDao;
-import org.apache.metron.elasticsearch.dao.ElasticsearchUpdateDao;
-import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
-import org.apache.metron.indexing.dao.search.InvalidSearchException;
-import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.update.Document;
-import org.apache.metron.indexing.dao.update.UpdateDao;
 import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.integration.UnableToStartException;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
@@ -62,9 +33,6 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.client.Client;
@@ -76,11 +44,23 @@ import org.elasticsearch.node.NodeValidationException;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.transport.Netty4Plugin;
-import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.json.simple.parser.ParseException;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+
 public class ElasticSearchComponent implements InMemoryComponent {
 
   private static class Mapping {
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
index 6a3638b..e5e85b0 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
@@ -18,170 +18,290 @@
 
 package org.apache.metron.elasticsearch.writer;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.elasticsearch.bulk.BulkDocumentWriter;
+import org.apache.metron.elasticsearch.bulk.BulkDocumentWriterResults;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
 
-import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import org.apache.metron.common.writer.BulkWriterResponse;
-import org.apache.storm.tuple.Tuple;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.junit.Test;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class ElasticsearchWriterTest {
-    @Test
-    public void testSingleSuccesses() throws Exception {
-        Tuple tuple1 = mock(Tuple.class);
 
-        BulkResponse response = mock(BulkResponse.class);
-        when(response.hasFailures()).thenReturn(false);
+    Map stormConf;
+    TopologyContext topologyContext;
+    WriterConfiguration writerConfiguration;
 
-        BulkWriterResponse expected = new BulkWriterResponse();
-        expected.addSuccess(tuple1);
+    @Before
+    public void setup() {
+        topologyContext = mock(TopologyContext.class);
 
-        ElasticsearchWriter esWriter = new ElasticsearchWriter();
-        BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1), response);
+        writerConfiguration = mock(WriterConfiguration.class);
+        when(writerConfiguration.getGlobalConfig()).thenReturn(globals());
 
-        assertEquals("Response should have no errors and single success", expected, actual);
+        stormConf = new HashMap();
     }
 
     @Test
-    public void testMultipleSuccesses() throws Exception {
-        Tuple tuple1 = mock(Tuple.class);
-        Tuple tuple2 = mock(Tuple.class);
-
-        BulkResponse response = mock(BulkResponse.class);
-        when(response.hasFailures()).thenReturn(false);
-
-        BulkWriterResponse expected = new BulkWriterResponse();
-        expected.addSuccess(tuple1);
-        expected.addSuccess(tuple2);
-
+    public void shouldWriteSuccessfully() {
+        // create a tuple and a message associated with that tuple
+        List<Tuple> tuples = createTuples(1);
+        List<JSONObject> messages = createMessages(1);
+
+        // create a document writer which will successfully write all
+        BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>();
+        results.addSuccess(createDocument(messages.get(0), tuples.get(0)));
+        BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+        when(docWriter.write()).thenReturn(results);
+
+        // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
-        BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1, tuple2), response);
+        esWriter.setDocumentWriter(docWriter);
+        esWriter.init(stormConf, topologyContext, writerConfiguration);
+        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
 
-        assertEquals("Response should have no errors and two successes", expected, actual);
+        // response should only contain successes
+        assertFalse(response.hasErrors());
+        assertTrue(response.getSuccesses().contains(tuples.get(0)));
     }
 
     @Test
-    public void testSingleFailure() throws Exception {
-        Tuple tuple1 = mock(Tuple.class);
-
-        BulkResponse response = mock(BulkResponse.class);
-        when(response.hasFailures()).thenReturn(true);
-
-        Exception e = new IllegalStateException();
-        BulkItemResponse itemResponse = buildBulkItemFailure(e);
-        when(response.iterator()).thenReturn(ImmutableList.of(itemResponse).iterator());
-
-        BulkWriterResponse expected = new BulkWriterResponse();
-        expected.addError(e, tuple1);
-
+    public void shouldWriteManySuccessfully() {
+        // create a few tuples and the messages associated with the tuples
+        List<Tuple> tuples = createTuples(3);
+        List<JSONObject> messages = createMessages(3);
+
+        // create a document writer which will successfully write all
+        BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>();
+        results.addSuccess(createDocument(messages.get(0), tuples.get(0)));
+        results.addSuccess(createDocument(messages.get(1), tuples.get(1)));
+        results.addSuccess(createDocument(messages.get(2), tuples.get(2)));
+        BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+        when(docWriter.write()).thenReturn(results);
+
+        // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
-        BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1), response);
-
-        assertEquals("Response should have one error and zero successes", expected, actual);
+        esWriter.setDocumentWriter(docWriter);
+        esWriter.init(stormConf, topologyContext, writerConfiguration);
+        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
+
+        // response should only contain successes
+        assertFalse(response.hasErrors());
+        assertTrue(response.getSuccesses().contains(tuples.get(0)));
+        assertTrue(response.getSuccesses().contains(tuples.get(1)));
+        assertTrue(response.getSuccesses().contains(tuples.get(2)));
     }
 
     @Test
-    public void testTwoSameFailure() throws Exception {
-        Tuple tuple1 = mock(Tuple.class);
-        Tuple tuple2 = mock(Tuple.class);
-
-        BulkResponse response = mock(BulkResponse.class);
-        when(response.hasFailures()).thenReturn(true);
-
-        Exception e = new IllegalStateException();
-
-        BulkItemResponse itemResponse = buildBulkItemFailure(e);
-        BulkItemResponse itemResponse2 = buildBulkItemFailure(e);
-
-        when(response.iterator()).thenReturn(ImmutableList.of(itemResponse, itemResponse2).iterator());
-
-        BulkWriterResponse expected = new BulkWriterResponse();
-        expected.addError(e, tuple1);
-        expected.addError(e, tuple2);
-
+    public void shouldHandleWriteFailure() {
+        // create a tuple and a message associated with that tuple
+        List<Tuple> tuples = createTuples(1);
+        List<JSONObject> messages = createMessages(1);
+        Exception cause = new Exception();
+
+        // create a document writer which will fail all writes
+        BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>();
+        results.addFailure(createDocument(messages.get(0), tuples.get(0)), cause, "error");
+        BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+        when(docWriter.write()).thenReturn(results);
+
+        // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
-        BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1, tuple2), response);
-
-        assertEquals("Response should have two errors and no successes", expected, actual);
-
-        // Ensure the errors actually get collapsed together
-        Map<Throwable, Collection<Tuple>> actualErrors = actual.getErrors();
-        HashMap<Throwable, Collection<Tuple>> expectedErrors = new HashMap<>();
-        expectedErrors.put(e, ImmutableList.of(tuple1, tuple2));
-        assertEquals("Errors should have collapsed together", expectedErrors, actualErrors);
+        esWriter.setDocumentWriter(docWriter);
+        esWriter.init(stormConf, topologyContext, writerConfiguration);
+        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
+
+        // the writer response should only contain failures
+        assertEquals(0, response.getSuccesses().size());
+        assertEquals(1, response.getErrors().size());
+        Collection<Tuple> errors = response.getErrors().get(cause);
+        assertTrue(errors.contains(tuples.get(0)));
     }
 
     @Test
-    public void testTwoDifferentFailure() throws Exception {
-        Tuple tuple1 = mock(Tuple.class);
-        Tuple tuple2 = mock(Tuple.class);
+    public void shouldHandleManyWriteFailures() {
+        // create a few tuples and the messages associated with the tuples
+        int count = 3;
+        List<Tuple> tuples = createTuples(count);
+        List<JSONObject> messages = createMessages(count);
+        Exception cause = new Exception();
+
+        // create a document writer which will fail all writes
+        BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>();
+        results.addFailure(createDocument(messages.get(0), tuples.get(0)), cause, "error");
+        results.addFailure(createDocument(messages.get(1), tuples.get(1)), cause, "error");
+        results.addFailure(createDocument(messages.get(2), tuples.get(2)), cause, "error");
+        BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+        when(docWriter.write()).thenReturn(results);
+
+        // attempt to write
+        ElasticsearchWriter esWriter = new ElasticsearchWriter();
+        esWriter.setDocumentWriter(docWriter);
+        esWriter.init(stormConf, topologyContext, writerConfiguration);
+        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
+
+        // the writer response should only contain failures
+        assertEquals(0, response.getSuccesses().size());
+        assertEquals(1, response.getErrors().size());
+        Collection<Tuple> errors = response.getErrors().get(cause);
+        assertTrue(errors.contains(tuples.get(0)));
+        assertTrue(errors.contains(tuples.get(1)));
+        assertTrue(errors.contains(tuples.get(2)));
+    }
 
-        BulkResponse response = mock(BulkResponse.class);
-        when(response.hasFailures()).thenReturn(true);
+    @Test
+    public void shouldHandlePartialFailures() {
+        // create a few tuples and the messages associated with the tuples
+        int count = 2;
+        List<Tuple> tuples = createTuples(count);
+        List<JSONObject> messages = createMessages(count);
+        Exception cause = new Exception();
+
+        // create a document writer that will fail one and succeed the other
+        BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>();
+        results.addFailure(createDocument(messages.get(0), tuples.get(0)), cause, "error");
+        results.addSuccess(createDocument(messages.get(1), tuples.get(1)));
+        BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+        when(docWriter.write()).thenReturn(results);
+
+        // attempt to write
+        ElasticsearchWriter esWriter = new ElasticsearchWriter();
+        esWriter.setDocumentWriter(docWriter);
+        esWriter.init(stormConf, topologyContext, writerConfiguration);
+        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
+
+        // response should contain some successes and some failures
+        assertEquals(1, response.getSuccesses().size());
+        assertEquals(1, response.getErrors().size());
+        assertTrue(response.getErrors().get(cause).contains(tuples.get(0)));
+        assertTrue(response.getSuccesses().contains(tuples.get(1)));
+    }
 
-        Exception e = new IllegalStateException("Cause");
-        Exception e2 = new IllegalStateException("Different Cause");
-        BulkItemResponse itemResponse = buildBulkItemFailure(e);
-        BulkItemResponse itemResponse2 = buildBulkItemFailure(e2);
+    @Test(expected = IllegalStateException.class)
+    public void shouldCheckIfNumberOfMessagesMatchNumberOfTuples() {
+        ElasticsearchWriter esWriter = new ElasticsearchWriter();
+        esWriter.setDocumentWriter(mock(BulkDocumentWriter.class));
+        esWriter.init(stormConf, topologyContext, writerConfiguration);
 
-        when(response.iterator()).thenReturn(ImmutableList.of(itemResponse, itemResponse2).iterator());
+        // there are 5 tuples and only 1 message; there should be 5 messages to match the number of tuples
+        List<Tuple> tuples = createTuples(5);
+        List<JSONObject> messages = createMessages(1);
 
-        BulkWriterResponse expected = new BulkWriterResponse();
-        expected.addError(e, tuple1);
-        expected.addError(e2, tuple2);
+        esWriter.write("bro", writerConfiguration, tuples, messages);
+        fail("expected exception");
+    }
 
+    @Test
+    public void shouldWriteSuccessfullyWhenMessageTimestampIsString() {
+        List<Tuple> tuples = createTuples(1);
+        List<JSONObject> messages = createMessages(1);
+
+        // the timestamp is a String, rather than a Long
+        messages.get(0).put(Constants.Fields.TIMESTAMP.getName(), new Long(System.currentTimeMillis()).toString());
+
+        // create the document
+        JSONObject message = messages.get(0);
+        String timestamp = (String) message.get(Constants.Fields.TIMESTAMP.getName());
+        String guid = (String) message.get(Constants.GUID);
+        String sensorType = (String) message.get(Constants.SENSOR_TYPE);
+        TupleBasedDocument document = new TupleBasedDocument(message, guid, sensorType, Long.parseLong(timestamp), tuples.get(0));
+
+        // create a document writer which will successfully write that document
+        BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>();
+        results.addSuccess(document);
+        BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+        when(docWriter.write()).thenReturn(results);
+
+        // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
-        BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1, tuple2), response);
+        esWriter.setDocumentWriter(docWriter);
+        esWriter.init(stormConf, topologyContext, writerConfiguration);
+        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
 
-        assertEquals("Response should have two errors and no successes", expected, actual);
-
-        // Ensure the errors did not get collapsed together
-        Map<Throwable, Collection<Tuple>> actualErrors = actual.getErrors();
-        HashMap<Throwable, Collection<Tuple>> expectedErrors = new HashMap<>();
-        expectedErrors.put(e, ImmutableList.of(tuple1));
-        expectedErrors.put(e2, ImmutableList.of(tuple2));
-        assertEquals("Errors should not have collapsed together", expectedErrors, actualErrors);
+        // response should only contain successes
+        assertFalse(response.hasErrors());
+        assertTrue(response.getSuccesses().contains(tuples.get(0)));
     }
 
     @Test
-    public void testSuccessAndFailure() throws Exception {
-        Tuple tuple1 = mock(Tuple.class);
-        Tuple tuple2 = mock(Tuple.class);
+    public void shouldWriteSuccessfullyWhenMissingGUID() {
+        // create a tuple and a message associated with that tuple
+        List<Tuple> tuples = createTuples(1);
+        List<JSONObject> messages = createMessages(1);
 
-        BulkResponse response = mock(BulkResponse.class);
-        when(response.hasFailures()).thenReturn(true);
+        // remove the GUID from the message
+        assertNotNull(messages.get(0).remove(Constants.GUID));
 
-        Exception e = new IllegalStateException("Cause");
-        BulkItemResponse itemResponse = buildBulkItemFailure(e);
+        // create a document writer which will successfully write all
+        BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>();
+        results.addSuccess(createDocument(messages.get(0), tuples.get(0)));
+        BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class);
+        when(docWriter.write()).thenReturn(results);
 
-        BulkItemResponse itemResponse2 = mock(BulkItemResponse.class);
-        when(itemResponse2.isFailed()).thenReturn(false);
+        // attempt to write
+        ElasticsearchWriter esWriter = new ElasticsearchWriter();
+        esWriter.setDocumentWriter(docWriter);
+        esWriter.init(stormConf, topologyContext, writerConfiguration);
+        BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages);
 
-        when(response.iterator()).thenReturn(ImmutableList.of(itemResponse, itemResponse2).iterator());
+        // response should only contain successes
+        assertFalse(response.hasErrors());
+        assertTrue(response.getSuccesses().contains(tuples.get(0)));
+    }
 
-        BulkWriterResponse expected = new BulkWriterResponse();
-        expected.addError(e, tuple1);
-        expected.addSuccess(tuple2);
+    private TupleBasedDocument createDocument(JSONObject message, Tuple tuple) {
+        Long timestamp = (Long) message.get(Constants.Fields.TIMESTAMP.getName());
+        String guid = (String) message.get(Constants.GUID);
+        String sensorType = (String) message.get(Constants.SENSOR_TYPE);
+        return new TupleBasedDocument(message, guid, sensorType, timestamp, tuple);
+    }
 
-        ElasticsearchWriter esWriter = new ElasticsearchWriter();
-        BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1, tuple2), response);
+    private JSONObject message() {
+        JSONObject message = new JSONObject();
+        message.put(Constants.GUID, UUID.randomUUID().toString());
+        message.put(Constants.Fields.TIMESTAMP.getName(), System.currentTimeMillis());
+        message.put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
+        message.put(Constants.SENSOR_TYPE, "sensor");
+        return message;
+    }
+
+    private Map<String, Object> globals() {
+        Map<String, Object> globals = new HashMap<>();
+        globals.put("es.date.format", "yyyy.MM.dd.HH");
+        return globals;
+    }
 
-        assertEquals("Response should have one error and one success", expected, actual);
+    private List<Tuple> createTuples(int count) {
+        List<Tuple> tuples = new ArrayList<>();
+        for(int i=0; i<count; i++) {
+            tuples.add(mock(Tuple.class));
+        }
+        return tuples;
     }
 
-    private BulkItemResponse buildBulkItemFailure(Exception e) {
-        BulkItemResponse itemResponse = mock(BulkItemResponse.class);
-        when(itemResponse.isFailed()).thenReturn(true);
-        BulkItemResponse.Failure failure = mock(BulkItemResponse.Failure.class);
-        when(itemResponse.getFailure()).thenReturn(failure);
-        when(failure.getCause()).thenReturn(e);
-        return itemResponse;
+    private List<JSONObject> createMessages(int count) {
+        List<JSONObject> messages = new ArrayList<>();
+        for(int i=0; i<count; i++) {
+            messages.add(message());
+        }
+        return messages;
     }
 }
diff --git a/metron-platform/metron-indexing/src/test/resources/log4j.properties b/metron-platform/metron-indexing/src/test/resources/log4j.properties
new file mode 100644
index 0000000..e69de29


Mime
View raw message