streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject [38/42] incubator-streams git commit: STREAMS-440: custom checkstyle.xml, address compliance
Date Fri, 25 Nov 2016 20:25:19 GMT
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java
index 100b0c5..8fbbf3c 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java
@@ -18,128 +18,197 @@
 
 package org.apache.streams.elasticsearch;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.streams.core.StreamsDatum;
 
+import com.fasterxml.jackson.databind.JsonNode;
+
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
+/**
+ * Utility class for handling Elasticsearch Metadata maps.
+ */
 public class ElasticsearchMetadataUtil {
 
-    public static String getIndex(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) {
+  /**
+   * get Index to use based on supplied parameters.
+   *
+   * @param metadata metadata
+   * @param config config
+   * @return result
+   */
+  public static String getIndex(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) {
 
-        String index = null;
+    String index = null;
 
-        if( metadata != null && metadata.containsKey("index"))
-            index = (String) metadata.get("index");
-
-        if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
-            index = config.getIndex();
-        }
-
-        return index;
+    if ( metadata != null && metadata.containsKey("index")) {
+      index = (String) metadata.get("index");
     }
 
-    public static String getType(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) {
-
-        String type = null;
+    if ( index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+      index = config.getIndex();
+    }
 
-        if( metadata != null && metadata.containsKey("type"))
-            type = (String) metadata.get("type");
+    return index;
+  }
 
-        if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
-            type = config.getType();
-        }
+  /**
+   * get Index to use based on supplied parameters.
+   *
+   * @param metadata metadata
+   * @param config config
+   * @return result
+   */
+  public static String getIndex(Map<String, Object> metadata, ElasticsearchReaderConfiguration config) {
 
+    String index = null;
 
-        return type;
+    if ( metadata != null && metadata.containsKey("index")) {
+      index = (String) metadata.get("index");
     }
 
-    public static String getIndex(Map<String, Object> metadata, ElasticsearchReaderConfiguration config) {
+    if ( index == null ) {
+      index = config.getIndexes().get(0);
+    }
 
-        String index = null;
+    return index;
+  }
 
-        if( metadata != null && metadata.containsKey("index"))
-            index = (String) metadata.get("index");
+  /**
+   * get Type to use based on supplied parameters.
+   *
+   * @param metadata metadata
+   * @param config config
+   * @return result
+   */
+  public static String getType(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) {
 
-        if(index == null) {
-            index = config.getIndexes().get(0);
-        }
+    String type = null;
 
-        return index;
+    if ( metadata != null && metadata.containsKey("type")) {
+      type = (String) metadata.get("type");
     }
 
-    public static String getType(Map<String, Object> metadata, ElasticsearchReaderConfiguration config) {
+    if (type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) {
+      type = config.getType();
+    }
 
-        String type = null;
+    return type;
+  }
 
-        if( metadata != null && metadata.containsKey("type"))
-            type = (String) metadata.get("type");
+  /**
+   * get Type to use based on supplied parameters.
+   *
+   * @param metadata metadata
+   * @param config config
+   * @return result
+   */
+  public static String getType(Map<String, Object> metadata, ElasticsearchReaderConfiguration config) {
 
-        if(type == null) {
-            type = config.getTypes().get(0);
-        }
+    String type = null;
 
+    if ( metadata != null && metadata.containsKey("type")) {
+      type = (String) metadata.get("type");
+    }
 
-        return type;
+    if (type == null) {
+      type = config.getTypes().get(0);
     }
 
-    public static String getId(StreamsDatum datum) {
 
-        String id = datum.getId();
+    return type;
+  }
+
+  /**
+   * get id to use based on supplied parameters.
+   *
+   * @param datum datum
+   * @return result
+   */
+  public static String getId(StreamsDatum datum) {
 
-        Map<String, Object> metadata = datum.getMetadata();
+    String id = datum.getId();
 
-        if( id == null && metadata != null && metadata.containsKey("id"))
-            id = (String) datum.getMetadata().get("id");
+    Map<String, Object> metadata = datum.getMetadata();
 
-        return id;
+    if ( id == null && metadata != null && metadata.containsKey("id")) {
+      id = (String) datum.getMetadata().get("id");
     }
 
-    static String getParent(StreamsDatum datum) {
+    return id;
+  }
 
-        String parent = null;
+  /**
+   * get id to use based on supplied parameters.
+   *
+   * @param metadata metadata
+   * @return result
+   */
+  public static String getId(Map<String, Object> metadata) {
 
-        Map<String, Object> metadata = datum.getMetadata();
+    return (String) metadata.get("id");
 
-        if(metadata != null && metadata.containsKey("parent"))
-            parent = (String) datum.getMetadata().get("parent");
+  }
 
-        return parent;
-    }
+  /**
+   * get parent id to use based on supplied parameters.
+   *
+   * @param datum datum
+   * @return result
+   */
+  static String getParent(StreamsDatum datum) {
 
-    static String getRouting(StreamsDatum datum) {
+    String parent = null;
 
-        String routing = null;
+    Map<String, Object> metadata = datum.getMetadata();
 
-        Map<String, Object> metadata = datum.getMetadata();
+    if (metadata != null && metadata.containsKey("parent")) {
+      parent = (String) datum.getMetadata().get("parent");
+    }
 
-        if(metadata != null && metadata.containsKey("routing"))
-            routing = (String) datum.getMetadata().get("routing");
+    return parent;
+  }
 
-        return routing;
-    }
+  /**
+   * get routing id to use based on supplied parameters.
+   *
+   * @param datum datum
+   * @return result
+   */
+  static String getRouting(StreamsDatum datum) {
 
-    public static String getId(Map<String, Object> metadata) {
+    String routing = null;
 
-        return (String) metadata.get("id");
+    Map<String, Object> metadata = datum.getMetadata();
 
+    if (metadata != null && metadata.containsKey("routing")) {
+      routing = (String) datum.getMetadata().get("routing");
     }
 
-    public static Map<String, Object> asMap(JsonNode node) {
+    return routing;
+  }
 
-        Iterator<Map.Entry<String, JsonNode>> iterator = node.fields();
-        Map<String, Object> ret = new HashMap<>();
+  /**
+   * get JsonNode as Map.
+   * @param node node
+   * @return result
+   */
+  // TODO: move this to a utility package
+  public static Map<String, Object> asMap(JsonNode node) {
 
-        Map.Entry<String, JsonNode> entry;
+    Iterator<Map.Entry<String, JsonNode>> iterator = node.fields();
+    Map<String, Object> ret = new HashMap<>();
 
-        while (iterator.hasNext()) {
-            entry = iterator.next();
-            if( entry.getValue().asText() != null )
-                ret.put(entry.getKey(), entry.getValue().asText());
-        }
+    Map.Entry<String, JsonNode> entry;
 
-        return ret;
+    while (iterator.hasNext()) {
+      entry = iterator.next();
+      if ( entry.getValue().asText() != null ) {
+        ret.put(entry.getKey(), entry.getValue().asText());
+      }
     }
+
+    return ret;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
index af754ad..789b62f 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
@@ -18,90 +18,106 @@
 
 package org.apache.streams.elasticsearch;
 
-import com.google.common.base.Preconditions;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
+
+import com.google.common.base.Preconditions;
+
 import org.elasticsearch.action.delete.DeleteRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
+/**
+ * ElasticsearchPersistDeleter deletes documents from elasticsearch.
+ */
 public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter implements StreamsPersistWriter {
 
-    public static final String STREAMS_ID = ElasticsearchPersistDeleter.class.getCanonicalName();
+  public static final String STREAMS_ID = ElasticsearchPersistDeleter.class.getCanonicalName();
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistDeleter.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistDeleter.class);
 
-    public ElasticsearchPersistDeleter() {
-        super();
-    }
+  public ElasticsearchPersistDeleter() {
+    super();
+  }
 
-    public ElasticsearchPersistDeleter(ElasticsearchWriterConfiguration config) {
-        super(config);
-    }
+  public ElasticsearchPersistDeleter(ElasticsearchWriterConfiguration config) {
+    super(config);
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public void write(StreamsDatum streamsDatum) {
+  @Override
+  public void write(StreamsDatum streamsDatum) {
 
-        if(streamsDatum == null || streamsDatum.getDocument() == null)
-            return;
+    if ( streamsDatum == null || streamsDatum.getDocument() == null) {
+      return;
+    }
 
-        LOGGER.debug("Delete Document: {}", streamsDatum.getDocument());
+    LOGGER.debug("Delete Document: {}", streamsDatum.getDocument());
 
-        Map<String, Object> metadata = streamsDatum.getMetadata();
+    Map<String, Object> metadata = streamsDatum.getMetadata();
 
-        LOGGER.debug("Delete Metadata: {}", metadata);
+    LOGGER.debug("Delete Metadata: {}", metadata);
 
-        String index = ElasticsearchMetadataUtil.getIndex(metadata, config);
-        String type = ElasticsearchMetadataUtil.getType(metadata, config);
-        String id = ElasticsearchMetadataUtil.getId(streamsDatum);
+    String index = ElasticsearchMetadataUtil.getIndex(metadata, config);
+    String type = ElasticsearchMetadataUtil.getType(metadata, config);
+    String id = ElasticsearchMetadataUtil.getId(streamsDatum);
 
-        try {
-            delete(index, type, id);
-        } catch (Throwable e) {
-            LOGGER.warn("Unable to Delete Document from ElasticSearch: {}", e.getMessage());
-        }
+    try {
+      delete(index, type, id);
+    } catch (Throwable ex) {
+      LOGGER.warn("Unable to Delete Document from ElasticSearch: {}", ex.getMessage());
     }
+  }
 
-    public void delete(String index, String type, String id) {
-        DeleteRequest deleteRequest;
+  /**
+   * Prepare and en-queue @see org.elasticsearch.action.delete.DeleteRequest
+   * @param index index
+   * @param type type
+   * @param id id
+   */
+  public void delete(String index, String type, String id) {
+    DeleteRequest deleteRequest;
 
-        Preconditions.checkNotNull(index);
-        Preconditions.checkNotNull(id);
-        Preconditions.checkNotNull(type);
+    Preconditions.checkNotNull(index);
+    Preconditions.checkNotNull(id);
+    Preconditions.checkNotNull(type);
 
-        // They didn't specify an ID, so we will create one for them.
-        deleteRequest = new DeleteRequest()
-                .index(index)
-                .type(type)
-                .id(id);
+    // They didn't specify an ID, so we will create one for them.
+    deleteRequest = new DeleteRequest()
+        .index(index)
+        .type(type)
+        .id(id);
 
-        add(deleteRequest);
+    add(deleteRequest);
 
-    }
-
-    public void add(DeleteRequest request) {
+  }
 
-        Preconditions.checkNotNull(request);
-        Preconditions.checkNotNull(request.index());
+  /**
+   * Enqueue DeleteRequest.
+   * @param request request
+   */
+  public void add(DeleteRequest request) {
 
-        // If our queue is larger than our flush threshold, then we should flush the queue.
-        synchronized (this) {
-            checkIndexImplications(request.index());
+    Preconditions.checkNotNull(request);
+    Preconditions.checkNotNull(request.index());
 
-            bulkRequest.add(request);
+    // If our queue is larger than our flush threshold, then we should flush the queue.
+    synchronized (this) {
+      checkIndexImplications(request.index());
 
-            currentBatchItems.incrementAndGet();
+      bulkRequest.add(request);
 
-            checkForFlush();
-        }
+      currentBatchItems.incrementAndGet();
 
+      checkForFlush();
     }
 
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
index 909f5c4..388497e 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
@@ -18,14 +18,16 @@
 
 package org.apache.streams.elasticsearch;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Queues;
 import org.apache.streams.core.DatumStatusCounter;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistReader;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Queues;
+
 import org.elasticsearch.search.SearchHit;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -44,187 +46,188 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+/**
+ * ElasticsearchPersistReader reads documents from elasticsearch.
+ */
 public class ElasticsearchPersistReader implements StreamsPersistReader, Serializable {
 
-    public static final String STREAMS_ID = "ElasticsearchPersistReader";
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReader.class);
-
-    protected volatile Queue<StreamsDatum> persistQueue;
-
-    private ElasticsearchQuery elasticsearchQuery;
-    private ElasticsearchReaderConfiguration config;
-    private int threadPoolSize = 10;
-    private ExecutorService executor;
-    private ReadWriteLock lock = new ReentrantReadWriteLock();
-    private Future<?> readerTask;
-
-    public ElasticsearchPersistReader() {
+  public static final String STREAMS_ID = "ElasticsearchPersistReader";
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReader.class);
+
+  protected volatile Queue<StreamsDatum> persistQueue;
+
+  private ElasticsearchQuery elasticsearchQuery;
+  private ElasticsearchReaderConfiguration config;
+  private int threadPoolSize = 10;
+  private ExecutorService executor;
+  private ReadWriteLock lock = new ReentrantReadWriteLock();
+  private Future<?> readerTask;
+
+  public ElasticsearchPersistReader() {
+  }
+
+  public ElasticsearchPersistReader(ElasticsearchReaderConfiguration config) {
+    this.config = config;
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  //PersistReader methods
+  @Override
+  public void startStream() {
+    LOGGER.debug("startStream");
+    executor = Executors.newSingleThreadExecutor();
+    readerTask = executor.submit(new ElasticsearchPersistReaderTask(this, elasticsearchQuery));
+  }
+
+  @Override
+  public void prepare(Object configuration) {
+    elasticsearchQuery = this.config == null ? new ElasticsearchQuery() : new ElasticsearchQuery(config);
+    elasticsearchQuery.execute(configuration);
+    persistQueue = constructQueue();
+  }
+
+  @Override
+  public StreamsResultSet readAll() {
+    return readCurrent();
+  }
+
+  @Override
+  public StreamsResultSet readCurrent() {
+
+    StreamsResultSet current;
+
+    try {
+      lock.writeLock().lock();
+      current = new StreamsResultSet(persistQueue);
+      current.setCounter(new DatumStatusCounter());
+      persistQueue = constructQueue();
+    } finally {
+      lock.writeLock().unlock();
     }
 
-    public ElasticsearchPersistReader(ElasticsearchReaderConfiguration config) {
-        this.config = config;
+    return current;
+
+  }
+
+  //TODO - This just reads current records and does not adjust any queries
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    return readCurrent();
+  }
+
+  //TODO - This just reads current records and does not adjust any queries
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    return readCurrent();
+  }
+
+  //If we still have data in the queue, we are still running
+  @Override
+  public boolean isRunning() {
+    return persistQueue.size() > 0 || (!readerTask.isDone() && !readerTask.isCancelled());
+  }
+
+  @Override
+  public void cleanUp() {
+    this.shutdownAndAwaitTermination(executor);
+    LOGGER.info("PersistReader done");
+    if ( elasticsearchQuery != null ) {
+      elasticsearchQuery.cleanUp();
     }
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
-
-    //PersistReader methods
-    @Override
-    public void startStream() {
-        LOGGER.debug("startStream");
-        executor = Executors.newSingleThreadExecutor();
-        readerTask = executor.submit(new ElasticsearchPersistReaderTask(this, elasticsearchQuery));
-    }
-
-    @Override
-    public void prepare(Object o) {
-        elasticsearchQuery = this.config == null ? new ElasticsearchQuery() : new ElasticsearchQuery(config);
-        elasticsearchQuery.execute(o);
-        persistQueue = constructQueue();
-    }
-
-    @Override
-    public StreamsResultSet readAll() {
-        return readCurrent();
+  }
+
+  //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue
+  //as it is a synchronized queue.  What we do care about is that we don't want to be offering to the current reference
+  //if the queue is being replaced with a new instance
+  protected void write(StreamsDatum entry) {
+    boolean success;
+    do {
+      try {
+        lock.readLock().lock();
+        success = persistQueue.offer(entry);
+        Thread.yield();
+      } finally {
+        lock.readLock().unlock();
+      }
     }
-
-    @Override
-    public StreamsResultSet readCurrent() {
-
-        StreamsResultSet current;
-
-        try {
-            lock.writeLock().lock();
-            current = new StreamsResultSet(persistQueue);
-            current.setCounter(new DatumStatusCounter());
-//            current.getCounter().add(countersCurrent);
-//            countersTotal.add(countersCurrent);
-//            countersCurrent = new DatumStatusCounter();
-            persistQueue = constructQueue();
-        } finally {
-            lock.writeLock().unlock();
+    while (!success);
+  }
+
+  protected void shutdownAndAwaitTermination(ExecutorService pool) {
+    pool.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+        pool.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+          LOGGER.error("Pool did not terminate");
         }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      pool.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
+  }
 
-        return current;
+  private Queue<StreamsDatum> constructQueue() {
+    return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
+  }
 
-    }
+  public static class ElasticsearchPersistReaderTask implements Runnable {
 
-    //TODO - This just reads current records and does not adjust any queries
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return readCurrent();
-    }
+    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReaderTask.class);
 
-    //TODO - This just reads current records and does not adjust any queries
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return readCurrent();
-    }
+    private ElasticsearchPersistReader reader;
+    private ElasticsearchQuery query;
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    //If we still have data in the queue, we are still running
-    @Override
-    public boolean isRunning() {
-        return persistQueue.size() > 0 || (!readerTask.isDone() && !readerTask.isCancelled());
+    public ElasticsearchPersistReaderTask(ElasticsearchPersistReader reader, ElasticsearchQuery query) {
+      this.reader = reader;
+      this.query = query;
     }
 
     @Override
-    public void cleanUp() {
-        this.shutdownAndAwaitTermination(executor);
-        LOGGER.info("PersistReader done");
-        if(elasticsearchQuery != null) {
-            elasticsearchQuery.cleanUp();
-        }
-    }
+    public void run() {
 
-    //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue
-    //as it is a synchronized queue.  What we do care about is that we don't want to be offering to the current reference
-    //if the queue is being replaced with a new instance
-    protected void write(StreamsDatum entry) {
-        boolean success;
-        do {
-            try {
-                lock.readLock().lock();
-                success = persistQueue.offer(entry);
-                Thread.yield();
-            }finally {
-                lock.readLock().unlock();
-            }
-        }
-        while (!success);
-    }
-
-    protected void shutdownAndAwaitTermination(ExecutorService pool) {
-        pool.shutdown(); // Disable new tasks from being submitted
+      StreamsDatum item;
+      while (query.hasNext()) {
+        SearchHit hit = query.next();
+        ObjectNode jsonObject = null;
         try {
-            // Wait a while for existing tasks to terminate
-            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-                pool.shutdownNow(); // Cancel currently executing tasks
-                // Wait a while for tasks to respond to being cancelled
-                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
-                    LOGGER.error("Pool did not terminate");
-            }
-        } catch (InterruptedException ie) {
-            // (Re-)Cancel if current thread also interrupted
-            pool.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    private Queue<StreamsDatum> constructQueue() {
-        return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
-    }
-
-    public static class ElasticsearchPersistReaderTask implements Runnable {
-
-        private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReaderTask.class);
-
-        private ElasticsearchPersistReader reader;
-        private ElasticsearchQuery query;
-        private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-        public ElasticsearchPersistReaderTask(ElasticsearchPersistReader reader, ElasticsearchQuery query) {
-            this.reader = reader;
-            this.query = query;
+          jsonObject = mapper.readValue(hit.getSourceAsString(), ObjectNode.class);
+          item = new StreamsDatum(jsonObject, hit.getId());
+          item.getMetadata().put("id", hit.getId());
+          item.getMetadata().put("index", hit.getIndex());
+          item.getMetadata().put("type", hit.getType());
+          if ( hit.fields().containsKey("_timestamp")) {
+            DateTime timestamp = new DateTime(((Long) hit.field("_timestamp").getValue()).longValue());
+            item.setTimestamp(timestamp);
+          }
+          if ( hit.fields().containsKey("_parent")) {
+            item.getMetadata().put("parent", hit.fields().get("_parent").value());
+          }
+          reader.write(item);
+        } catch (IOException ex) {
+          LOGGER.warn("Unable to process json source: ", hit.getSourceAsString());
         }
 
-        @Override
-        public void run() {
-
-            StreamsDatum item;
-            while (query.hasNext()) {
-                SearchHit hit = query.next();
-                ObjectNode jsonObject = null;
-                try {
-                    jsonObject = mapper.readValue(hit.getSourceAsString(), ObjectNode.class);
-                    item = new StreamsDatum(jsonObject, hit.getId());
-                    item.getMetadata().put("id", hit.getId());
-                    item.getMetadata().put("index", hit.getIndex());
-                    item.getMetadata().put("type", hit.getType());
-                    if( hit.fields().containsKey("_timestamp")) {
-                        DateTime timestamp = new DateTime(((Long) hit.field("_timestamp").getValue()).longValue());
-                        item.setTimestamp(timestamp);
-                    }
-                    if( hit.fields().containsKey("_parent")) {
-                        item.getMetadata().put("parent", hit.fields().get("_parent").value());
-                    }
-                    reader.write(item);
-                } catch (IOException e) {
-                    LOGGER.warn("Unable to process json source: ", hit.getSourceAsString());
-                }
-
-            }
-            try {
-                Thread.sleep(new Random().nextInt(100));
-            } catch (InterruptedException e) {
-                LOGGER.warn("Thread interrupted", e);
-            }
+      }
+      try {
+        Thread.sleep(new Random().nextInt(100));
+      } catch (InterruptedException ex) {
+        LOGGER.warn("Thread interrupted", ex);
+      }
 
-        }
     }
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index f712248..f4da436 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -18,111 +18,131 @@
 
 package org.apache.streams.elasticsearch;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
 import org.elasticsearch.action.update.UpdateRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
+/**
+ * ElasticsearchPersistUpdater updates documents to elasticsearch.
+ */
 public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter {
 
-    public static final String STREAMS_ID = ElasticsearchPersistUpdater.class.getCanonicalName();
+  public static final String STREAMS_ID = ElasticsearchPersistUpdater.class.getCanonicalName();
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistUpdater.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistUpdater.class);
 
-    public ElasticsearchPersistUpdater() {
-        super();
-    }
+  public ElasticsearchPersistUpdater() {
+    super();
+  }
 
-    public ElasticsearchPersistUpdater(ElasticsearchWriterConfiguration config) {
-        super(config);
-    }
+  public ElasticsearchPersistUpdater(ElasticsearchWriterConfiguration config) {
+    super(config);
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public void write(StreamsDatum streamsDatum) {
+  @Override
+  public void write(StreamsDatum streamsDatum) {
 
-        if(streamsDatum == null || streamsDatum.getDocument() == null)
-            return;
+    if (streamsDatum == null || streamsDatum.getDocument() == null) {
+      return;
+    }
 
-        LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
+    LOGGER.debug("Update Document: {}", streamsDatum.getDocument());
 
-        Map<String, Object> metadata = streamsDatum.getMetadata();
+    Map<String, Object> metadata = streamsDatum.getMetadata();
 
-        LOGGER.debug("Update Metadata: {}", metadata);
+    LOGGER.debug("Update Metadata: {}", metadata);
 
-        String index = ElasticsearchMetadataUtil.getIndex(metadata, config);
-        String type = ElasticsearchMetadataUtil.getType(metadata, config);
-        String id = ElasticsearchMetadataUtil.getId(streamsDatum);
-        String parent = ElasticsearchMetadataUtil.getParent(streamsDatum);
-        String routing = ElasticsearchMetadataUtil.getRouting(streamsDatum);
+    String index = ElasticsearchMetadataUtil.getIndex(metadata, config);
+    String type = ElasticsearchMetadataUtil.getType(metadata, config);
+    String id = ElasticsearchMetadataUtil.getId(streamsDatum);
+    String parent = ElasticsearchMetadataUtil.getParent(streamsDatum);
+    String routing = ElasticsearchMetadataUtil.getRouting(streamsDatum);
 
-        try {
+    try {
 
-            String docAsJson = docAsJson(streamsDatum.getDocument());
+      String docAsJson = docAsJson(streamsDatum.getDocument());
 
-            LOGGER.debug("Attempt Update: ({},{},{},{},{}) {}", index, type, id, parent, routing, docAsJson);
+      LOGGER.debug("Attempt Update: ({},{},{},{},{}) {}", index, type, id, parent, routing, docAsJson);
 
-            update(index, type, id, parent, routing, docAsJson);
+      update(index, type, id, parent, routing, docAsJson);
 
-        } catch (Throwable e) {
-            LOGGER.warn("Unable to Update Document in ElasticSearch: {}", e.getMessage());
-        }
+    } catch (Throwable ex) {
+      LOGGER.warn("Unable to Update Document in ElasticSearch: {}", ex.getMessage());
+    }
+  }
+
+  /**
+   * Prepare and en-queue.
+   * @see org.elasticsearch.action.update.UpdateRequest
+   * @param indexName indexName
+   * @param type type
+   * @param id id
+   * @param parent parent
+   * @param routing routing
+   * @param json json
+   */
+  public void update(String indexName, String type, String id, String parent, String routing, String json) {
+    UpdateRequest updateRequest;
+
+    Preconditions.checkNotNull(id);
+    Preconditions.checkNotNull(json);
+
+    // They didn't specify an ID, so we will create one for them.
+    updateRequest = new UpdateRequest()
+        .index(indexName)
+        .type(type)
+        .id(id)
+        .doc(json);
+
+    if (!Strings.isNullOrEmpty(parent)) {
+      updateRequest = updateRequest.parent(parent);
     }
 
-    public void update(String indexName, String type, String id, String parent, String routing, String json) {
-        UpdateRequest updateRequest;
-
-        Preconditions.checkNotNull(id);
-        Preconditions.checkNotNull(json);
-
-        // They didn't specify an ID, so we will create one for them.
-        updateRequest = new UpdateRequest()
-                .index(indexName)
-                .type(type)
-                .id(id)
-                .doc(json);
-
-        if(!Strings.isNullOrEmpty(parent)) {
-            updateRequest = updateRequest.parent(parent);
-        }
-
-        if(!Strings.isNullOrEmpty(routing)) {
-            updateRequest = updateRequest.routing(routing);
-        }
+    if (!Strings.isNullOrEmpty(routing)) {
+      updateRequest = updateRequest.routing(routing);
+    }
 
-        // add fields
-        //updateRequest.docAsUpsert(true);
+    // add fields
+    //updateRequest.docAsUpsert(true);
 
-        add(updateRequest);
+    add(updateRequest);
 
-    }
+  }
 
-    public void add(UpdateRequest request) {
+  /**
+   * Enqueue UpdateRequest.
+   * @param request request
+   */
+  public void add(UpdateRequest request) {
 
-        Preconditions.checkNotNull(request);
-        Preconditions.checkNotNull(request.index());
+    Preconditions.checkNotNull(request);
+    Preconditions.checkNotNull(request.index());
 
-        // If our queue is larger than our flush threshold, then we should flush the queue.
-        synchronized (this) {
-            checkIndexImplications(request.index());
+    // If our queue is larger than our flush threshold, then we should flush the queue.
+    synchronized (this) {
+      checkIndexImplications(request.index());
 
-            bulkRequest.add(request);
+      bulkRequest.add(request);
 
-            currentBatchBytes.addAndGet(request.doc().source().length());
-            currentBatchItems.incrementAndGet();
-
-            checkForFlush();
-        }
+      currentBatchBytes.addAndGet(request.doc().source().length());
+      currentBatchItems.incrementAndGet();
 
+      checkForFlush();
     }
 
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 8f9c7d7..07ab734 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -19,14 +19,16 @@
 
 package org.apache.streams.elasticsearch;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@@ -55,495 +57,581 @@ import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+/**
+ * ElasticsearchPersistUpdater updates documents to elasticsearch.
+ */
 public class ElasticsearchPersistWriter implements StreamsPersistWriter, Serializable {
 
-    public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName();
+  public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName();
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
-    private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
-    private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
-    private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5L * 1024L * 1024L;
-    private static final int DEFAULT_BATCH_SIZE = 100;
-    //ES defaults its bulk index queue to 50 items.  We want to be under this on our backoff so set this to 1/2 ES default
-    //at a batch size as configured here.
-    private static final long WAITING_DOCS_LIMIT = DEFAULT_BATCH_SIZE * 25;
-    //A document should have to wait no more than 10s to get flushed
-    private static final long DEFAULT_MAX_WAIT = 10000;
+  private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
+  private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
+  private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
+  private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5L * 1024L * 1024L;
+  private static final int DEFAULT_BATCH_SIZE = 100;
+  //ES defaults its bulk index queue to 50 items.  We want to be under this on our backoff so set this to 1/2 ES default
+  //at a batch size as configured here.
+  private static final long WAITING_DOCS_LIMIT = DEFAULT_BATCH_SIZE * 25;
+  //A document should have to wait no more than 10s to get flushed
+  private static final long DEFAULT_MAX_WAIT = 10000;
 
-    protected static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
+  protected static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
 
-    protected final List<String> affectedIndexes = new ArrayList<>();
+  protected final List<String> affectedIndexes = new ArrayList<>();
 
-    protected final ElasticsearchClientManager manager;
-    protected final ElasticsearchWriterConfiguration config;
+  protected final ElasticsearchClientManager manager;
+  protected final ElasticsearchWriterConfiguration config;
 
-    protected BulkRequestBuilder bulkRequest;
+  protected BulkRequestBuilder bulkRequest;
 
-    private boolean veryLargeBulk = false;  // by default this setting is set to false
-    private long flushThresholdsRecords = DEFAULT_BATCH_SIZE;
-    private long flushThresholdBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
+  private boolean veryLargeBulk = false;  // by default this setting is set to false
+  private long flushThresholdsRecords = DEFAULT_BATCH_SIZE;
+  private long flushThresholdBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
 
-    private long flushThresholdTime = DEFAULT_MAX_WAIT;
-    private long lastFlush = new Date().getTime();
-    private Timer timer = new Timer();
+  private long flushThresholdTime = DEFAULT_MAX_WAIT;
+  private long lastFlush = new Date().getTime();
+  private Timer timer = new Timer();
 
 
-    private final AtomicInteger batchesSent = new AtomicInteger(0);
-    private final AtomicInteger batchesResponded = new AtomicInteger(0);
+  private final AtomicInteger batchesSent = new AtomicInteger(0);
+  private final AtomicInteger batchesResponded = new AtomicInteger(0);
 
-    protected final AtomicLong currentBatchItems = new AtomicLong(0);
-    protected final AtomicLong currentBatchBytes = new AtomicLong(0);
+  protected final AtomicLong currentBatchItems = new AtomicLong(0);
+  protected final AtomicLong currentBatchBytes = new AtomicLong(0);
 
-    private final AtomicLong totalSent = new AtomicLong(0);
-    private final AtomicLong totalSeconds = new AtomicLong(0);
-    private final AtomicLong totalOk = new AtomicLong(0);
-    private final AtomicLong totalFailed = new AtomicLong(0);
-    private final AtomicLong totalSizeInBytes = new AtomicLong(0);
+  private final AtomicLong totalSent = new AtomicLong(0);
+  private final AtomicLong totalSeconds = new AtomicLong(0);
+  private final AtomicLong totalOk = new AtomicLong(0);
+  private final AtomicLong totalFailed = new AtomicLong(0);
+  private final AtomicLong totalSizeInBytes = new AtomicLong(0);
 
-    public ElasticsearchPersistWriter() {
-        this(new ComponentConfigurator<>(ElasticsearchWriterConfiguration.class)
-          .detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch")));
-    }
-
-    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
-        this(config, new ElasticsearchClientManager(config));
-    }
+  public ElasticsearchPersistWriter() {
+    this(new ComponentConfigurator<>(ElasticsearchWriterConfiguration.class)
+        .detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch")));
+  }
 
-    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config, ElasticsearchClientManager manager) {
-        this.config = config;
-        this.manager = manager;
-        this.bulkRequest = this.manager.getClient().prepareBulk();
-    }
+  public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
+    this(config, new ElasticsearchClientManager(config));
+  }
 
-    public long getBatchesSent()                            { return this.batchesSent.get(); }
-    public long getBatchesResponded()                       { return batchesResponded.get(); }
+  /**
+   * ElasticsearchPersistWriter constructor.
+   * @param config config
+   * @param manager manager
+   */
+  public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config, ElasticsearchClientManager manager) {
+    this.config = config;
+    this.manager = manager;
+    this.bulkRequest = this.manager.getClient().prepareBulk();
+  }
 
+  public long getBatchesSent() {
+    return this.batchesSent.get();
+  }
 
-    public long getFlushThresholdsRecords()                 { return this.flushThresholdsRecords; }
-    public long getFlushThresholdBytes()                    { return this.flushThresholdBytes; }
-    public long getFlushThreasholdMaxTime()                 { return this.flushThresholdTime; }
+  public long getBatchesResponded() {
+    return batchesResponded.get();
+  }
 
-    public void setFlushThresholdRecords(long val)          { this.flushThresholdsRecords = val; }
-    public void setFlushThresholdBytes(long val)            { this.flushThresholdBytes = val; }
-    public void setFlushThreasholdMaxTime(long val)         { this.flushThresholdTime = val; }
-    public void setVeryLargeBulk(boolean veryLargeBulk)     { this.veryLargeBulk = veryLargeBulk; }
+  public long getFlushThresholdsRecords() {
+    return this.flushThresholdsRecords;
+  }
 
-    private long getLastFlush()                             { return this.lastFlush; }
+  public long getFlushThresholdBytes() {
+    return this.flushThresholdBytes;
+  }
 
-    public long getTotalOutstanding()                       { return this.totalSent.get() - (this.totalFailed.get() + this.totalOk.get()); }
-    public long getTotalSent()                              { return this.totalSent.get(); }
-    public long getTotalOk()                                { return this.totalOk.get(); }
-    public long getTotalFailed()                            { return this.totalFailed.get(); }
-    public long getTotalSizeInBytes()                       { return this.totalSizeInBytes.get(); }
-    public long getTotalSeconds()                           { return this.totalSeconds.get(); }
-    public List<String> getAffectedIndexes()                { return this.affectedIndexes; }
+  public long getFlushThreasholdMaxTime() {
+    return this.flushThresholdTime;
+  }
 
-    public boolean isConnected()                            { return (this.manager.getClient() != null); }
+  public void setFlushThresholdRecords(long val) {
+    this.flushThresholdsRecords = val;
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  public void setFlushThresholdBytes(long val) {
+    this.flushThresholdBytes = val;
+  }
 
-    @Override
-    public void write(StreamsDatum streamsDatum) {
-        if(streamsDatum == null || streamsDatum.getDocument() == null)
-            return;
+  public void setFlushThreasholdMaxTime(long val) {
+    this.flushThresholdTime = val;
+  }
 
-        checkForBackOff();
+  public void setVeryLargeBulk(boolean veryLargeBulk) {
+    this.veryLargeBulk = veryLargeBulk;
+  }
 
-        LOGGER.debug("Write Document: {}", streamsDatum.getDocument());
+  private long getLastFlush() {
+    return this.lastFlush;
+  }
+
+  public long getTotalOutstanding() {
+    return this.totalSent.get() - (this.totalFailed.get() + this.totalOk.get());
+  }
 
-        Map<String, Object> metadata = streamsDatum.getMetadata();
+  public long getTotalSent() {
+    return this.totalSent.get();
+  }
 
-        LOGGER.debug("Write Metadata: {}", metadata);
+  public long getTotalOk() {
+    return this.totalOk.get();
+  }
 
-        String index = ElasticsearchMetadataUtil.getIndex(metadata, config);
-        String type = ElasticsearchMetadataUtil.getType(metadata, config);
-        String id = ElasticsearchMetadataUtil.getId(streamsDatum);
-        String parent = ElasticsearchMetadataUtil.getParent(streamsDatum);
-        String routing = ElasticsearchMetadataUtil.getRouting(streamsDatum);
+  public long getTotalFailed() {
+    return this.totalFailed.get();
+  }
 
-        try {
-            streamsDatum = appendMetadata(streamsDatum);
-            String docAsJson = docAsJson(streamsDatum.getDocument());
-            add(index, type, id, parent, routing,
-                    streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis()) : Long.toString(streamsDatum.getTimestamp().getMillis()),
-                    docAsJson);
-        } catch (Throwable e) {
-            LOGGER.warn("Unable to Write Datum to ElasticSearch: {}", e.getMessage());
-        }
+  public long getTotalSizeInBytes() {
+    return this.totalSizeInBytes.get();
+  }
+
+  public long getTotalSeconds() {
+    return this.totalSeconds.get();
+  }
+
+  public List<String> getAffectedIndexes() {
+    return this.affectedIndexes;
+  }
+
+  public boolean isConnected() {
+    return (this.manager.getClient() != null);
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void write(StreamsDatum streamsDatum) {
+
+    if (streamsDatum == null || streamsDatum.getDocument() == null) {
+      return;
     }
 
-    protected String docAsJson(Object streamsDocument) throws IOException {
-        return (streamsDocument instanceof String) ? streamsDocument.toString() : OBJECT_MAPPER.writeValueAsString(streamsDocument);
-    }
+    checkForBackOff();
 
-    protected StreamsDatum appendMetadata(StreamsDatum streamsDatum) throws IOException {
+    LOGGER.debug("Write Document: {}", streamsDatum.getDocument());
 
-        String docAsJson = (streamsDatum.getDocument() instanceof String) ? streamsDatum.getDocument().toString() : OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
+    Map<String, Object> metadata = streamsDatum.getMetadata();
 
-        if(streamsDatum.getMetadata() == null || streamsDatum.getMetadata().size() == 0)
-            return streamsDatum;
-        else {
-            ObjectNode node = (ObjectNode)OBJECT_MAPPER.readTree(docAsJson);
-            node.put("_metadata", OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsBytes(streamsDatum.getMetadata())));
-            streamsDatum.setDocument(OBJECT_MAPPER.writeValueAsString(node));
-            return streamsDatum;
-        }
+    LOGGER.debug("Write Metadata: {}", metadata);
+
+    String index = ElasticsearchMetadataUtil.getIndex(metadata, config);
+    String type = ElasticsearchMetadataUtil.getType(metadata, config);
+    String id = ElasticsearchMetadataUtil.getId(streamsDatum);
+    String parent = ElasticsearchMetadataUtil.getParent(streamsDatum);
+    String routing = ElasticsearchMetadataUtil.getRouting(streamsDatum);
+
+    try {
+      streamsDatum = appendMetadata(streamsDatum);
+      String docAsJson = docAsJson(streamsDatum.getDocument());
+      add(index, type, id, parent, routing,
+          streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis()) : Long.toString(streamsDatum.getTimestamp().getMillis()),
+          docAsJson);
+    } catch (Throwable ex) {
+      LOGGER.warn("Unable to Write Datum to ElasticSearch: {}", ex.getMessage());
     }
+  }
 
-    public void cleanUp() {
+  protected String docAsJson(Object streamsDocument) throws IOException {
+    return (streamsDocument instanceof String) ? streamsDocument.toString() : OBJECT_MAPPER.writeValueAsString(streamsDocument);
+  }
 
-        try {
+  protected StreamsDatum appendMetadata(StreamsDatum streamsDatum) throws IOException {
 
-            LOGGER.debug("cleanUp started");
+    String docAsJson = (streamsDatum.getDocument() instanceof String) ? streamsDatum.getDocument().toString() : OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
 
-            // before they close, check to ensure that
-            flushInternal();
+    if (streamsDatum.getMetadata() == null || streamsDatum.getMetadata().size() == 0) {
+      return streamsDatum;
+    } else {
+      ObjectNode node = (ObjectNode)OBJECT_MAPPER.readTree(docAsJson);
+      node.put("_metadata", OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsBytes(streamsDatum.getMetadata())));
+      streamsDatum.setDocument(OBJECT_MAPPER.writeValueAsString(node));
+      return streamsDatum;
+    }
+  }
 
-            LOGGER.debug("flushInternal completed");
+  @Override
+  public void cleanUp() {
 
-            waitToCatchUp(0, 5 * 60 * 1000);
+    try {
 
-            LOGGER.debug("waitToCatchUp completed");
+      LOGGER.debug("cleanUp started");
 
-        } catch (Throwable e) {
-            // this line of code should be logically unreachable.
-            LOGGER.warn("This is unexpected: {}", e);
-        } finally {
+      // before they close, check to ensure that
+      flushInternal();
 
-            if(veryLargeBulk) {
-                resetRefreshInterval();
-            }
+      LOGGER.debug("flushInternal completed");
 
-            if( config.getRefresh() ) {
-                refreshIndexes();
-                LOGGER.debug("refreshIndexes completed");
-            }
+      waitToCatchUp(0, 5 * 60 * 1000);
 
-            LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]",
-              this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding());
-            timer.cancel();
+      LOGGER.debug("waitToCatchUp completed");
 
-            LOGGER.debug("cleanUp completed");
-        }
-    }
+    } catch (Throwable ex) {
+      // this line of code should be logically unreachable.
+      LOGGER.warn("This is unexpected: {}", ex);
+    } finally {
 
-    private void resetRefreshInterval() {
-        for (String indexName : this.affectedIndexes) {
-
-            if (this.veryLargeBulk) {
-                LOGGER.debug("Resetting our Refresh Interval: {}", indexName);
-                // They are in 'very large bulk' mode and the process is finished. We now want to turn the
-                // refreshing back on.
-                UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
-                updateSettingsRequest.settings(Settings.settingsBuilder().put("refresh_interval", "5s"));
-
-                // submit to ElasticSearch
-                this.manager.getClient()
-                        .admin()
-                        .indices()
-                        .updateSettings(updateSettingsRequest)
-                        .actionGet();
-            }
-        }
-    }
+      if (veryLargeBulk) {
+        resetRefreshInterval();
+      }
 
-    private void refreshIndexes() {
-        for (String indexName : this.affectedIndexes) {
-
-            if (config.getRefresh()) {
-                LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
-                this.manager.getClient()
-                        .admin()
-                        .indices()
-                        .prepareRefresh(indexName)
-                        .execute()
-                        .actionGet();
-            }
-        }
-    }
+      if ( config.getRefresh() ) {
+        refreshIndexes();
+        LOGGER.debug("refreshIndexes completed");
+      }
 
-    private synchronized void flushInternal() {
-        // we do not have a working bulk request, we can just exit here.
-        if (this.bulkRequest == null || this.currentBatchItems.get() == 0)
-            return;
+      LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]",
+          this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding());
+      timer.cancel();
 
-        // wait for one minute to catch up if it needs to
-        waitToCatchUp(5, 60 * 1000);
+      LOGGER.debug("cleanUp completed");
+    }
+  }
+
+  private void resetRefreshInterval() {
+    for (String indexName : this.affectedIndexes) {
+
+      if (this.veryLargeBulk) {
+        LOGGER.debug("Resetting our Refresh Interval: {}", indexName);
+        // They are in 'very large bulk' mode and the process is finished. We now want to turn the
+        // refreshing back on.
+        UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
+        updateSettingsRequest.settings(Settings.settingsBuilder().put("refresh_interval", "5s"));
+
+        // submit to ElasticSearch
+        this.manager.getClient()
+            .admin()
+            .indices()
+            .updateSettings(updateSettingsRequest)
+            .actionGet();
+      }
+    }
+  }
 
-        // call the flush command.
-        flush(this.bulkRequest, this.currentBatchItems.get(), this.currentBatchBytes.get());
+  private void refreshIndexes() {
 
-        // reset the current batch statistics
-        this.currentBatchItems.set(0);
-        this.currentBatchBytes.set(0);
+    for (String indexName : this.affectedIndexes) {
 
-        // reset our bulk request builder
-        this.bulkRequest = this.manager.getClient().prepareBulk();
+      if (config.getRefresh()) {
+        LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
+        this.manager.getClient()
+            .admin()
+            .indices()
+            .prepareRefresh(indexName)
+            .execute()
+            .actionGet();
+      }
     }
+  }
 
-    private synchronized void waitToCatchUp(int batchThreshold, int timeOutThresholdInMS) {
-        int counter = 0;
-        // If we still have 5 batches outstanding, we need to give it a minute to catch up
-        while(this.getBatchesSent() - this.getBatchesResponded() > batchThreshold && counter < timeOutThresholdInMS) {
-            try {
-                Thread.yield();
-                Thread.sleep(1);
-                counter++;
-            } catch(InterruptedException ie) {
-                LOGGER.warn("Catchup was interrupted.  Data may be lost");
-                return;
-            }
-        }
+  private synchronized void flushInternal() {
+    // we do not have a working bulk request, we can just exit here.
+    if (this.bulkRequest == null || this.currentBatchItems.get() == 0) {
+      return;
     }
 
-    private void checkForBackOff() {
-        try {
-            if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) {
-                /*
-                 * Author:
-                 * Smashew
-                 *
-                 * Date:
-                 * 2013-10-20
-                 *
-                 * Note:
-                 * With the information that we have on hand. We need to develop a heuristic
-                 * that will determine when the cluster is having a problem indexing records
-                 * by telling it to pause and wait for it to catch back up. A
-                 *
-                 * There is an impact to us, the caller, whenever this happens as well. Items
-                 * that are not yet fully indexed by the server sit in a queue, on the client
-                 * that can cause the heap to overflow. This has been seen when re-indexing
-                 * large amounts of data to a small cluster. The "deletes" + "indexes" can
-                 * cause the server to have many 'outstandingItems" in queue. Running this
-                 * software with large amounts of data, on a small cluster, can re-create
-                 * this problem.
-                 *
-                 * DO NOT DELETE THESE LINES
-                 ****************************************************************************/
-
-                // wait for the flush to catch up. We are going to cap this at
-                int count = 0;
-                while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT && count++ < 500)
-                    Thread.sleep(10);
-
-                if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT)
-                    LOGGER.warn("Even after back-off there are {} items still in queue.", this.getTotalOutstanding());
-            }
-        } catch (Exception e) {
-            LOGGER.warn("We were broken from our loop: {}", e.getMessage());
+    // wait for one minute to catch up if it needs to
+    waitToCatchUp(5, 60 * 1000);
+
+    // call the flush command.
+    flush(this.bulkRequest, this.currentBatchItems.get(), this.currentBatchBytes.get());
+
+    // reset the current batch statistics
+    this.currentBatchItems.set(0);
+    this.currentBatchBytes.set(0);
+
+    // reset our bulk request builder
+    this.bulkRequest = this.manager.getClient().prepareBulk();
+  }
+
+  private synchronized void waitToCatchUp(int batchThreshold, int timeOutThresholdInMS) {
+    int counter = 0;
+    // If we still have 5 batches outstanding, we need to give it a minute to catch up
+    while (this.getBatchesSent() - this.getBatchesResponded() > batchThreshold && counter < timeOutThresholdInMS) {
+      try {
+        Thread.yield();
+        Thread.sleep(1);
+        counter++;
+      } catch (InterruptedException ie) {
+        LOGGER.warn("Catchup was interrupted.  Data may be lost");
+        return;
+      }
+    }
+  }
+
+  private void checkForBackOff() {
+    try {
+      if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) {
+        /*
+         * Author:
+         * Smashew
+         *
+         * Date:
+         * 2013-10-20
+         *
+         * Note:
+         * With the information that we have on hand. We need to develop a heuristic
+         * that will determine when the cluster is having a problem indexing records
+         * by telling it to pause and wait for it to catch back up. A
+         *
+         * There is an impact to us, the caller, whenever this happens as well. Items
+         * that are not yet fully indexed by the server sit in a queue, on the client
+         * that can cause the heap to overflow. This has been seen when re-indexing
+         * large amounts of data to a small cluster. The "deletes" + "indexes" can
+         * cause the server to have many 'outstandingItems" in queue. Running this
+         * software with large amounts of data, on a small cluster, can re-create
+         * this problem.
+         *
+         * DO NOT DELETE THESE LINES
+         ****************************************************************************/
+
+        // wait for the flush to catch up. We are going to cap this at
+        int count = 0;
+        while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT && count++ < 500) {
+          Thread.sleep(10);
         }
+        if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) {
+          LOGGER.warn("Even after back-off there are {} items still in queue.", this.getTotalOutstanding());
+        }
+      }
+    } catch (Exception ex) {
+      LOGGER.warn("We were broken from our loop: {}", ex.getMessage());
     }
-
-    public void add(String indexName, String type, String id, String ts, String json) {
-        add(indexName, type, id, null, null, ts, json);
+  }
+
+  /**
+   * add based on supplied parameters.
+   * @param indexName indexName
+   * @param type type
+   * @param id id
+   * @param ts ts
+   * @param json json
+   */
+  public void add(String indexName, String type, String id, String ts, String json) {
+    add(indexName, type, id, null, null, ts, json);
+  }
+
+  /**
+   * add based on supplied parameters.
+   * @param indexName indexName
+   * @param type type
+   * @param id id
+   * @param routing routing
+   * @param ts ts
+   * @param json json
+   */
+  public void add(String indexName, String type, String id, String parent, String routing, String ts, String json) {
+
+    // make sure that these are not null
+    Preconditions.checkNotNull(indexName);
+    Preconditions.checkNotNull(type);
+    Preconditions.checkNotNull(json);
+
+    IndexRequestBuilder indexRequestBuilder = manager.getClient()
+        .prepareIndex(indexName, type)
+        .setSource(json);
+
+    // / They didn't specify an ID, so we will create one for them.
+    if (id != null) {
+      indexRequestBuilder.setId(id);
     }
-
-    public void add(String indexName, String type, String id, String parent, String routing, String ts, String json) {
-
-        // make sure that these are not null
-        Preconditions.checkNotNull(indexName);
-        Preconditions.checkNotNull(type);
-        Preconditions.checkNotNull(json);
-
-        IndexRequestBuilder indexRequestBuilder = manager.getClient()
-                .prepareIndex(indexName, type)
-                .setSource(json);
-
-        // / They didn't specify an ID, so we will create one for them.
-        if(id != null)
-            indexRequestBuilder.setId(id);
-
-        if(ts != null)
-            indexRequestBuilder.setTimestamp(ts);
-
-        if(parent != null)
-            indexRequestBuilder.setParent(parent);
-
-        if(routing != null)
-            indexRequestBuilder.setRouting(routing);
-
-        add(indexRequestBuilder.request());
+    if (ts != null) {
+      indexRequestBuilder.setTimestamp(ts);
+    }
+    if (parent != null) {
+      indexRequestBuilder.setParent(parent);
     }
+    if (routing != null) {
+      indexRequestBuilder.setRouting(routing);
+    }
+    add(indexRequestBuilder.request());
+  }
 
-    protected void add(IndexRequest request) {
+  protected void add(IndexRequest request) {
 
-        Preconditions.checkNotNull(request);
-        Preconditions.checkNotNull(request.index());
+    Preconditions.checkNotNull(request);
+    Preconditions.checkNotNull(request.index());
 
-        // If our queue is larger than our flush threshold, then we should flush the queue.
-        synchronized (this) {
-            checkIndexImplications(request.index());
+    // If our queue is larger than our flush threshold, then we should flush the queue.
+    synchronized (this) {
+      checkIndexImplications(request.index());
 
-            bulkRequest.add(request);
+      bulkRequest.add(request);
 
-            this.currentBatchBytes.addAndGet(request.source().length());
-            this.currentBatchItems.incrementAndGet();
+      this.currentBatchBytes.addAndGet(request.source().length());
+      this.currentBatchItems.incrementAndGet();
 
-            checkForFlush();
-        }
+      checkForFlush();
     }
-
-    protected void checkForFlush() {
-        synchronized (this) {
-            if (this.currentBatchBytes.get() >= this.flushThresholdBytes ||
-                    this.currentBatchItems.get() >= this.flushThresholdsRecords ||
-                    new Date().getTime() - this.lastFlush >= this.flushThresholdTime) {
-                // We should flush
-                flushInternal();
-            }
-        }
+  }
+
+  protected void checkForFlush() {
+    synchronized (this) {
+      if (this.currentBatchBytes.get() >= this.flushThresholdBytes
+          ||
+          this.currentBatchItems.get() >= this.flushThresholdsRecords
+          ||
+          new Date().getTime() - this.lastFlush >= this.flushThresholdTime) {
+        // We should flush
+        flushInternal();
+      }
     }
+  }
 
-    protected void checkIndexImplications(String indexName) {
-        // We need this to be safe across all writers that are currently being executed
-        synchronized (ElasticsearchPersistWriter.class) {
+  protected void checkIndexImplications(String indexName) {
+    // We need this to be safe across all writers that are currently being executed
+    synchronized (ElasticsearchPersistWriter.class) {
 
-            // this will be common if we have already verified the index.
-            if (this.affectedIndexes.contains(indexName))
-                return;
+      // this will be common if we have already verified the index.
+      if (this.affectedIndexes.contains(indexName)) {
+        return;
+      }
 
+      // create the index if it is missing
+      createIndexIfMissing(indexName);
 
-            // create the index if it is missing
-            createIndexIfMissing(indexName);
+      // we haven't log this index.
+      this.affectedIndexes.add(indexName);
 
-            // we haven't log this index.
-            this.affectedIndexes.add(indexName);
-
-        }
     }
-
-    protected void disableRefresh() {
-
-        for (String indexName : this.affectedIndexes) {
-            // They are in 'very large bulk' mode we want to turn off refreshing the index.
-            // Create a request then add the setting to tell it to stop refreshing the interval
-            UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
-            updateSettingsRequest.settings(Settings.settingsBuilder().put("refresh_interval", -1));
-
-            // submit to ElasticSearch
-            this.manager.getClient()
-                    .admin()
-                    .indices()
-                    .updateSettings(updateSettingsRequest)
-                    .actionGet();
-        }
+  }
+
+  protected void disableRefresh() {
+
+    for (String indexName : this.affectedIndexes) {
+      // They are in 'very large bulk' mode we want to turn off refreshing the index.
+      // Create a request then add the setting to tell it to stop refreshing the interval
+      UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
+      updateSettingsRequest.settings(Settings.settingsBuilder().put("refresh_interval", -1));
+
+      // submit to ElasticSearch
+      this.manager.getClient()
+          .admin()
+          .indices()
+          .updateSettings(updateSettingsRequest)
+          .actionGet();
     }
-
-    public void createIndexIfMissing(String indexName) {
-        // Synchronize this on a static class level
-        if (!this.manager.getClient()
-                .admin()
-                .indices()
-                .exists(new IndicesExistsRequest(indexName))
-                .actionGet()
-                .isExists())
-        {
-            // It does not exist... So we are going to need to create the index.
-            // we are going to assume that the 'templates' that we have loaded into
-            // elasticsearch are sufficient to ensure the index is being created properly.
-            CreateIndexResponse response = this.manager.getClient().admin().indices().create(new CreateIndexRequest(indexName)).actionGet();
-
-            if (response.isAcknowledged()) {
-                LOGGER.info("Index Created: {}", indexName);
-            } else {
-                LOGGER.error("Index {} did not exist. While attempting to create the index from stored ElasticSearch Templates we were unable to get an acknowledgement.", indexName);
-                LOGGER.error("Error Message: {}", response.toString());
-                throw new RuntimeException("Unable to create index " + indexName);
-            }
-        }
+  }
+
+  /**
+   * createIndexIfMissing
+   * @param indexName indexName
+   */
+  public void createIndexIfMissing(String indexName) {
+    // Synchronize this on a static class level
+    if (!this.manager.getClient()
+        .admin()
+        .indices()
+        .exists(new IndicesExistsRequest(indexName))
+        .actionGet()
+        .isExists()) {
+      // It does not exist... So we are going to need to create the index.
+      // we are going to assume that the 'templates' that we have loaded into
+      // elasticsearch are sufficient to ensure the index is being created properly.
+      CreateIndexResponse response = this.manager.getClient().admin().indices().create(new CreateIndexRequest(indexName)).actionGet();
+
+      if (response.isAcknowledged()) {
+        LOGGER.info("Index Created: {}", indexName);
+      } else {
+        LOGGER.error("Index {} did not exist. While attempting to create the index from stored ElasticSearch Templates we were unable to get an acknowledgement.", indexName);
+        LOGGER.error("Error Message: {}", response.toString());
+        throw new RuntimeException("Unable to create index " + indexName);
+      }
     }
-
-    public void prepare(Object configurationObject) {
-        this.veryLargeBulk = config.getBulk() == null ?
-                Boolean.FALSE :
-                config.getBulk();
-
-        this.flushThresholdsRecords = config.getBatchSize() == null ?
-                DEFAULT_BATCH_SIZE :
-                (int)(config.getBatchSize().longValue());
-
-        this.flushThresholdTime = config.getMaxTimeBetweenFlushMs() != null && config.getMaxTimeBetweenFlushMs() > 0 ?
-                config.getMaxTimeBetweenFlushMs() :
-                DEFAULT_MAX_WAIT;
-
-        this.flushThresholdBytes = config.getBatchBytes() == null ?
-                DEFAULT_BULK_FLUSH_THRESHOLD :
-                config.getBatchBytes();
-
-        timer.scheduleAtFixedRate(new TimerTask() {
-            public void run() {
-                checkForFlush();
-            }
-        }, this.flushThresholdTime, this.flushThresholdTime);
-
-        if( veryLargeBulk )
-            disableRefresh();
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    this.veryLargeBulk = config.getBulk() == null
+        ? Boolean.FALSE
+        : config.getBulk();
+
+    this.flushThresholdsRecords = config.getBatchSize() == null
+        ? DEFAULT_BATCH_SIZE
+        : (int)(config.getBatchSize().longValue());
+
+    this.flushThresholdTime = config.getMaxTimeBetweenFlushMs() != null && config.getMaxTimeBetweenFlushMs() > 0
+        ? config.getMaxTimeBetweenFlushMs()
+        : DEFAULT_MAX_WAIT;
+
+    this.flushThresholdBytes = config.getBatchBytes() == null
+        ? DEFAULT_BULK_FLUSH_THRESHOLD
+        : config.getBatchBytes();
+
+    timer.scheduleAtFixedRate(new TimerTask() {
+      public void run() {
+        checkForFlush();
+      }
+    }, this.flushThresholdTime, this.flushThresholdTime);
+
+    if ( veryLargeBulk ) {
+      disableRefresh();
     }
+  }
 
-    private void flush(final BulkRequestBuilder bulkRequest, final Long sent, final Long sizeInBytes) {
-        LOGGER.debug("Writing to ElasticSearch: Items[{}] Size[{} mb]", sent, MEGABYTE_FORMAT.format(sizeInBytes / (double) (1024 * 1024)));
+  private void flush(final BulkRequestBuilder bulkRequest, final Long sent, final Long sizeInBytes) {
+    LOGGER.debug("Writing to ElasticSearch: Items[{}] Size[{} mb]", sent, MEGABYTE_FORMAT.format(sizeInBytes / (double) (1024 * 1024)));
 
 
-        // record the last time we flushed the index
-        this.lastFlush = new Date().getTime();
+    // record the last time we flushed the index
+    this.lastFlush = new Date().getTime();
 
-        // add the totals
-        this.totalSent.addAndGet(sent);
+    // add the totals
+    this.totalSent.addAndGet(sent);
 
-        // add the total number of batches sent
-        this.batchesSent.incrementAndGet();
+    // add the total number of batches sent
+    this.batchesSent.incrementAndGet();
 
-        try {
-            bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
-                public void onResponse(BulkResponse bulkItemResponses) {
-                    batchesResponded.incrementAndGet();
-                    updateTotals(bulkItemResponses, sent, sizeInBytes);
-                }
-
-                public void onFailure(Throwable throwable) {
-                    batchesResponded.incrementAndGet();
-                    throwable.printStackTrace();
-                }
-            });
-        }
-        catch(Throwable e) {
-            LOGGER.error("There was an error sending the batch: {}", e.getMessage());
+    try {
+      bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
+        public void onResponse(BulkResponse bulkItemResponses) {
+          batchesResponded.incrementAndGet();
+          updateTotals(bulkItemResponses, sent, sizeInBytes);
         }
-    }
 
-    private void updateTotals(final BulkResponse bulkItemResponses, final Long sent, final Long sizeInBytes) {
-        long failed = 0;
-        long passed = 0;
-        long millis = bulkItemResponses.getTookInMillis();
-
-        // keep track of the number of totalFailed and items that we have totalOk.
-        for (BulkItemResponse resp : bulkItemResponses.getItems()) {
-            if (resp == null || resp.isFailed()) {
-                failed++;
-                LOGGER.debug("{} ({},{},{}) failed: {}", resp.getOpType(), resp.getIndex(), resp.getType(), resp.getId(), resp.getFailureMessage());
-            }
-            else
-                passed++;
+        public void onFailure(Throwable throwable) {
+          batchesResponded.incrementAndGet();
+          throwable.printStackTrace();
         }
+      });
+    } catch (Throwable ex) {
+      LOGGER.error("There was an error sending the batch: {}", ex.getMessage());
+    }
+  }
+
+  private void updateTotals(final BulkResponse bulkItemResponses, final Long sent, final Long sizeInBytes) {
+    long failed = 0;
+    long passed = 0;
+    long millis = bulkItemResponses.getTookInMillis();
+
+    // keep track of the number of totalFailed and items that we have totalOk.
+    for (BulkItemResponse resp : bulkItemResponses.getItems()) {
+      if (resp == null || resp.isFailed()) {
+        failed++;
+        LOGGER.debug("{} ({},{},{}) failed: {}", resp.getOpType(), resp.getIndex(), resp.getType(), resp.getId(), resp.getFailureMessage());
+      } else {
+        passed++;
+      }
+    }
 
-        if (failed > 0)
-            LOGGER.warn("Bulk Uploading had {} failures of {}", failed, sent);
-
-        this.totalOk.addAndGet(passed);
-        this.totalFailed.addAndGet(failed);
-        this.totalSeconds.addAndGet(millis / 1000);
-        this.totalSizeInBytes.addAndGet(sizeInBytes);
+    if (failed > 0) {
+      LOGGER.warn("Bulk Uploading had {} failures of {}", failed, sent);
+    }
 
-        if (sent != (passed + failed))
-            LOGGER.error("Count MisMatch: Sent[{}] Passed[{}] Failed[{}]", sent, passed, failed);
+    this.totalOk.addAndGet(passed);
+    this.totalFailed.addAndGet(failed);
+    this.totalSeconds.addAndGet(millis / 1000);
+    this.totalSizeInBytes.addAndGet(sizeInBytes);
 
-        LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
-                MEGABYTE_FORMAT.format(sizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(passed), NUMBER_FORMAT.format(failed), NUMBER_FORMAT.format(millis),
-                MEGABYTE_FORMAT.format((double) totalSizeInBytes.get() / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
+    if (sent != (passed + failed)) {
+      LOGGER.error("Count MisMatch: Sent[{}] Passed[{}] Failed[{}]", sent, passed, failed);
     }
 
+    LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
+        MEGABYTE_FORMAT.format(sizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(passed), NUMBER_FORMAT.format(failed), NUMBER_FORMAT.format(millis),
+        MEGABYTE_FORMAT.format((double) totalSizeInBytes.get() / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding()));
+  }
+
 }


Mime
View raw message