streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mfrank...@apache.org
Subject [02/12] git commit: Fixed the ElasticSearchPersistWriter
Date Wed, 02 Jul 2014 16:13:53 GMT
Fixed the ElasticSearchPersistWriter


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/1d36ab61
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/1d36ab61
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/1d36ab61

Branch: refs/heads/instagram
Commit: 1d36ab61e1046ec8c197a9be23b3234fed4cf56c
Parents: 34c95a6
Author: Matthew Hager <Matthew.Hager@gmail.com>
Authored: Thu Jun 26 15:16:24 2014 -0500
Committer: Matthew Hager <Matthew.Hager@gmail.com>
Committed: Thu Jun 26 15:16:24 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchConfigurator.java              |   4 +
 .../ElasticsearchPersistWriter.java             | 693 ++++++++-----------
 2 files changed, 283 insertions(+), 414 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1d36ab61/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index af4e360..4507fb5 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@ -77,6 +77,10 @@ public class ElasticsearchConfigurator {
         if( elasticsearch.hasPath("batchSize"))
             elasticsearchWriterConfiguration.setBatchSize(elasticsearch.getLong("batchSize"));
 
+        if( elasticsearch.hasPath("batchBytes"))
+            elasticsearchWriterConfiguration.setBatchBytes(elasticsearch.getLong("batchBytes"));
+
+
         elasticsearchWriterConfiguration.setIndex(index);
         elasticsearchWriterConfiguration.setType(type);
         elasticsearchWriterConfiguration.setMaxTimeBetweenFlushMs(maxMsBeforeFlush);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1d36ab61/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index da5916b..95f5f57 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -16,20 +16,18 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.streams.elasticsearch;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.TreeNode;
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Objects;
-import com.google.common.base.Optional;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.*;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ListenableActionFuture;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -39,321 +37,227 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.client.Client;
+import org.elasticsearch.common.joda.time.DateTime;
 import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.index.query.IdsQueryBuilder;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
-import org.json.JSONException;
-import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.core.JsonParser;
 
-import java.io.Closeable;
-import java.io.Flushable;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
+import java.io.Serializable;
 import java.text.DecimalFormat;
 import java.text.NumberFormat;
 import java.util.*;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable,
DatumStatusCountable {
-    public static final String STREAMS_ID = "ElasticsearchPersistWriter";
+public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumStatusCountable,
Serializable {
 
-    public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
+    public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName();
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
     private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
     private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
     private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
     private static final long WAITING_DOCS_LIMIT = 10000;
-    private static final int BYTES_IN_MB = 1024 * 1024;
-    private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
     private static final long DEFAULT_MAX_WAIT = 10000;
     private static final int DEFAULT_BATCH_SIZE = 100;
 
+    private static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
+
     private final List<String> affectedIndexes = new ArrayList<String>();
-    private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
-    //Primary lock for preventing multiple synchronous batches with the same data
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
-    //Create independent locks to synchronize updates that have nothing to do with actually
sending data
-    private final Object countLock = new Object();
-    private final Object requestLock = new Object();
-
-    private ObjectMapper mapper = new StreamsJacksonMapper();
-    private ElasticsearchClientManager manager;
-    private ElasticsearchWriterConfiguration config;
-    private Client client;
-    private String parentID = null;
+
+    private final ElasticsearchClientManager manager;
+    private final ElasticsearchWriterConfiguration config;
+
     private BulkRequestBuilder bulkRequest;
-    private OutputStreamWriter currentWriter = null;
-    private int batchSize;
-    private long maxTimeBetweenFlushMs;
+
     private boolean veryLargeBulk = false;  // by default this setting is set to false
+    private long flushThresholdsRecords = DEFAULT_BATCH_SIZE;
+    private long flushThresholdBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
 
-    protected Thread task;
-
-    protected volatile Queue<StreamsDatum> persistQueue;
-    protected final List<ListenableActionFuture<BulkResponse>> responses = Lists.newLinkedList();
-
-    private volatile int currentItems = 0;
-    private volatile int totalSent = 0;
-    private volatile int totalSeconds = 0;
-    private volatile int totalAttempted = 0;
-    private volatile int totalOk = 0;
-    private volatile int totalFailed = 0;
-    private volatile int totalBatchCount = 0;
-    private volatile int totalRecordsWritten = 0;
-    private volatile long totalSizeInBytes = 0;
-    private volatile long batchSizeInBytes = 0;
-    private volatile int batchItemsSent = 0;
-    private volatile int totalByteCount = 0;
-    private volatile int byteCount = 0;
-    private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
+    private long flushThresholdTime = DEFAULT_MAX_WAIT;
+    private long lastFlush = new Date().getTime();
+    private Timer timer = new Timer();
 
-    public ElasticsearchPersistWriter() {
-        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
-        this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
-    }
 
-    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
-        this.config = config;
-    }
+    private final AtomicInteger batchesSent = new AtomicInteger(0);
+    private final AtomicInteger batchesResponded = new AtomicInteger(0);
 
-    public void setBatchSize(int batchSize) {
-        this.batchSize = batchSize;
-    }
+    private final AtomicLong currentBatchItems = new AtomicLong(0);
+    private final AtomicLong currentBatchBytes = new AtomicLong(0);
 
-    public void setVeryLargeBulk(boolean veryLargeBulk) {
-        this.veryLargeBulk = veryLargeBulk;
-    }
+    private final AtomicLong totalSent = new AtomicLong(0);
+    private final AtomicLong totalSeconds = new AtomicLong(0);
+    private final AtomicLong totalOk = new AtomicLong(0);
+    private final AtomicLong totalFailed = new AtomicLong(0);
+    private final AtomicLong totalSizeInBytes = new AtomicLong(0);
 
-    public int getTotalOutstanding() {
-        return this.totalSent - (this.totalFailed + this.totalOk);
-    }
-
-    public long getFlushThresholdSizeInBytes() {
-        return flushThresholdSizeInBytes;
+    public ElasticsearchPersistWriter() {
+        this(ElasticsearchConfigurator.detectWriterConfiguration(StreamsConfigurator.config.getConfig("elasticsearch")));
     }
 
-    public int getTotalSent() {
-        return totalSent;
+    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
+        this(config, new ElasticsearchClientManager(config));
     }
 
-    public int getTotalSeconds() {
-        return totalSeconds;
+    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config, ElasticsearchClientManager
manager) {
+        this.config = config;
+        this.manager = manager;
+        this.bulkRequest = this.manager.getClient().prepareBulk();
     }
 
-    public int getTotalOk() {
-        return totalOk;
-    }
+    public long getBatchesSent()                            { return this.batchesSent.get();
}
+    public long getBatchesResponded()                       { return batchesResponded.get();
}
 
-    public int getTotalFailed() {
-        return totalFailed;
-    }
 
-    public int getTotalBatchCount() {
-        return totalBatchCount;
-    }
+    public long getFlushThresholdsRecords()                 { return this.flushThresholdsRecords;
}
+    public long getFlushThresholdBytes()                    { return this.flushThresholdBytes;
}
+    public long getFlushThreasholdMaxTime()                 { return this.flushThresholdTime;
}
 
-    public long getTotalSizeInBytes() {
-        return totalSizeInBytes;
-    }
+    public void setFlushThresholdRecords(long val)          { this.flushThresholdsRecords
= val; }
+    public void setFlushThresholdBytes(long val)            { this.flushThresholdBytes =
val; }
+    public void setFlushThreasholdMaxTime(long val)         { this.flushThresholdTime = val;
}
+    public void setVeryLargeBulk(boolean veryLargeBulk)     { this.veryLargeBulk = veryLargeBulk;
}
 
-    public long getBatchSizeInBytes() {
-        return batchSizeInBytes;
-    }
+    private long getLastFlush()                             { return this.lastFlush; }
 
-    public int getBatchItemsSent() {
-        return batchItemsSent;
-    }
+    public long getTotalOutstanding()                       { return this.totalSent.get()
- (this.totalFailed.get() + this.totalOk.get()); }
+    public long getTotalSent()                              { return this.totalSent.get();
}
+    public long getTotalOk()                                { return this.totalOk.get();
}
+    public long getTotalFailed()                            { return this.totalFailed.get();
}
+    public long getTotalSizeInBytes()                       { return this.totalSizeInBytes.get();
}
+    public long getTotalSeconds()                           { return this.totalSeconds.get();
}
+    public List<String> getAffectedIndexes()                { return this.affectedIndexes;
}
 
-    public List<String> getAffectedIndexes() {
-        return this.affectedIndexes;
-    }
+    public boolean isConnected()                            { return (this.manager.getClient()
!= null); }
 
-    public void setFlushThresholdSizeInBytes(long sizeInBytes) {
-        this.flushThresholdSizeInBytes = sizeInBytes;
-    }
+    @Override
+    public void write(StreamsDatum streamsDatum) {
+        if(streamsDatum == null || streamsDatum.getDocument() == null)
+            return;
 
-    public long getMaxTimeBetweenFlushMs() {
-        return maxTimeBetweenFlushMs;
-    }
+        checkForBackOff();
 
-    public void setMaxTimeBetweenFlushMs(long maxTimeBetweenFlushMs) {
-        this.maxTimeBetweenFlushMs = maxTimeBetweenFlushMs;
+        try {
+            add(config.getIndex(), config.getType(), streamsDatum.getId(),
+                    streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis())
: Long.toString(streamsDatum.getTimestamp().getMillis()),
+                    convertAndAppendMetadata(streamsDatum));
+        } catch (Throwable e) {
+            LOGGER.warn("Unable to Write Datum to ElasticSearch: {}", e.getMessage());
+        }
     }
 
-    public boolean isConnected() {
-        return (client != null);
-    }
 
-    @Override
-    public void write(StreamsDatum streamsDatum) {
+    private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException
{
+        Object object = streamsDatum.getDocument();
 
-        String json;
-        String id = null;
-        String ts = null;
-        try {
-            if( streamsDatum.getId() != null ) {
-                id = streamsDatum.getId();
-            }
-            if( streamsDatum.getTimestamp() != null ) {
-                ts = Long.toString(streamsDatum.getTimestamp().getMillis());
+        String docAsJson = (object instanceof String) ? object.toString() : OBJECT_MAPPER.writeValueAsString(object);
+        if(streamsDatum.getMetadata() == null || streamsDatum.getMetadata().size() == 0)
+            return docAsJson;
+        else {
+            ObjectNode node = (ObjectNode)OBJECT_MAPPER.readTree(docAsJson);
+            try {
+                node.put("_metadata", OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsBytes(streamsDatum.getMetadata())));
             }
-            if (streamsDatum.getDocument() instanceof String)
-                json = streamsDatum.getDocument().toString();
-            else {
-                json = mapper.writeValueAsString(streamsDatum.getDocument());
+            catch(Throwable e) {
+                LOGGER.warn("Unable to write metadata");
             }
-
-            add(config.getIndex(), config.getType(), id, ts, json);
-
-        } catch (Exception e) {
-            LOGGER.warn("{} {}", e.getMessage());
-            e.printStackTrace();
+            return OBJECT_MAPPER.writeValueAsString(node);
         }
     }
 
     public void cleanUp() {
-
         try {
-            flush();
-            backgroundFlushTask.shutdownNow();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        close();
-    }
 
-    @Override
-    public void close() {
-        try {
             // before they close, check to ensure that
-            this.flush();
-
-            this.lock.writeLock().lock();
-
-            int count = 0;
-            // We are going to give it 5 minutes.
-            while (this.getTotalOutstanding() > 0 && count++ < 20 * 60 * 5)
{
-                for(ListenableActionFuture<BulkResponse> future : responses) {
-                    if(future.isDone() || future.isCancelled()) {
-                        BulkResponse response = future.get();
-                        LOGGER.warn("Found index request for {} items that was closed without
notification", response.getItems().length);
-                        updateTotals(response, 0, 0);
-                    }
-                }
-                Thread.sleep(50);
-            }
-
-            if (this.getTotalOutstanding() > 0) {
-                LOGGER.error("We never cleared our buffer");
-            }
+            flushInternal();
 
+            waitToCatchUp(0, 5 * 60 * 1000);
+            refreshIndexes();
 
-            for (String indexName : this.getAffectedIndexes()) {
-                createIndexIfMissing(indexName);
+            LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", this.totalOk.get(),
this.totalFailed.get(), this.getTotalOutstanding());
 
-                if (this.veryLargeBulk) {
-                    LOGGER.debug("Resetting our Refresh Interval: {}", indexName);
-                    // They are in 'very large bulk' mode and the process is finished. We
now want to turn the
-                    // refreshing back on.
-                    UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
-                    updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval",
"5s"));
+        } catch (Throwable e) {
+            // this line of code should be logically unreachable.
+            LOGGER.warn("This is unexpected: {}", e.getMessage());
+            e.printStackTrace();
+        }
+    }
 
-                    // submit to ElasticSearch
-                    this.manager.getClient()
-                            .admin()
-                            .indices()
-                            .updateSettings(updateSettingsRequest)
-                            .actionGet();
-                }
+    private void refreshIndexes() {
+        for (String indexName : this.affectedIndexes) {
 
-                checkIndexImplications(indexName);
+            if (this.veryLargeBulk) {
+                LOGGER.debug("Resetting our Refresh Interval: {}", indexName);
+                // They are in 'very large bulk' mode and the process is finished. We now
want to turn the
+                // refreshing back on.
+                UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
+                updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval",
"5s"));
 
-                LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
+                // submit to ElasticSearch
                 this.manager.getClient()
                         .admin()
                         .indices()
-                        .prepareRefresh(indexName)
-                        .execute()
+                        .updateSettings(updateSettingsRequest)
                         .actionGet();
             }
 
-            LOGGER.info("Closed: Wrote[{} of {}] Failed[{}]", this.getTotalOk(), this.getTotalSent(),
this.getTotalFailed());
+            checkIndexImplications(indexName);
 
-        } catch (Exception e) {
-            // this line of code should be logically unreachable.
-            LOGGER.warn("This is unexpected: {}", e.getMessage());
-            e.printStackTrace();
-        } finally {
-            this.lock.writeLock().unlock();
+            LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
+            this.manager.getClient()
+                    .admin()
+                    .indices()
+                    .prepareRefresh(indexName)
+                    .execute()
+                    .actionGet();
         }
     }
 
     @Override
-    public void flush() throws IOException {
-        flushInternal();
-    }
-
-    @Override
     public DatumStatusCounter getDatumStatusCounter() {
         DatumStatusCounter counters = new DatumStatusCounter();
-        counters.incrementAttempt(this.batchItemsSent);
-        counters.incrementStatus(DatumStatus.SUCCESS, this.totalOk);
-        counters.incrementStatus(DatumStatus.FAIL, this.totalFailed);
+        counters.incrementStatus(DatumStatus.SUCCESS, (int)this.totalOk.get());
+        counters.incrementStatus(DatumStatus.FAIL, (int)this.totalFailed.get());
         return counters;
     }
 
-    public void start() {
-        backgroundFlushTask.scheduleWithFixedDelay(new Runnable() {
-            @Override
-            public void run() {
-                LOGGER.debug("Checking to see if data needs to be flushed");
-                long time = System.currentTimeMillis() - lastWrite.get();
-                if (time > maxTimeBetweenFlushMs && batchItemsSent > 0) {
-                    LOGGER.debug("Background Flush task determined {} are waiting to be flushed.
 It has been {} since the last write to ES", batchItemsSent, time);
-                    flushInternal();
-                }
-            }
-        }, 0, maxTimeBetweenFlushMs * 2, TimeUnit.MILLISECONDS);
-        manager = new ElasticsearchClientManager(config);
-        client = manager.getClient();
-
-        LOGGER.info(client.toString());
-    }
-
-    public void flushInternal() {
-        lock.writeLock().lock();
+    private synchronized void flushInternal() {
         // we do not have a working bulk request, we can just exit here.
-        if (this.bulkRequest == null || batchItemsSent == 0)
+        if (this.bulkRequest == null || this.currentBatchItems.get() == 0)
             return;
 
+        // wait for one minute to catch up if it needs to
+        waitToCatchUp(5, 1 * 60 * 1000);
+
         // call the flush command.
-        flush(this.bulkRequest, batchItemsSent, batchSizeInBytes);
+        flush(this.bulkRequest, this.currentBatchItems.get(), this.currentBatchBytes.get());
 
-        // null the flush request, this will be created in the 'add' function below
-        this.bulkRequest = null;
+        // reset the current batch statistics
+        this.currentBatchItems.set(0);
+        this.currentBatchBytes.set(0);
 
-        // record the proper statistics, and add it to our totals.
-        this.totalSizeInBytes += this.batchSizeInBytes;
-        this.totalSent += batchItemsSent;
+        // reset our bulk request builder
+        this.bulkRequest = this.manager.getClient().prepareBulk();
+    }
 
-        // reset the current batch statistics
-        this.batchSizeInBytes = 0;
-        this.batchItemsSent = 0;
+    private synchronized void waitToCatchUp(int batchThreshold, int timeOutThresholdInMS)
{
+        int counter = 0;
+        // If we still have 5 batches outstanding, we need to give it a minute to catch up
+        while(this.getBatchesSent() - this.getBatchesResponded() > batchThreshold &&
counter < timeOutThresholdInMS) {
+            try {
+                Thread.yield();
+                Thread.sleep(1);
+                timeOutThresholdInMS++;
+            } catch(InterruptedException ie) {
+                // No Operation
+            }
+        }
+    }
 
+    private void checkForBackOff() {
         try {
-            int count = 0;
             if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) {
                 /****************************************************************************
                  * Author:
@@ -379,6 +283,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter,
Flushab
                  ****************************************************************************/
 
                 // wait for the flush to catch up. We are going to cap this at
+                int count = 0;
                 while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT && count++
< 500)
                     Thread.sleep(10);
 
@@ -386,29 +291,20 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter,
Flushab
                     LOGGER.warn("Even after back-off there are {} items still in queue.",
this.getTotalOutstanding());
             }
         } catch (Exception e) {
-            LOGGER.info("We were broken from our loop: {}", e.getMessage());
-        } finally {
-            lock.writeLock().unlock();
+            LOGGER.warn("We were broken from our loop: {}", e.getMessage());
         }
-
-    }
-
-    public void add(String indexName, String type, String json) {
-        add(indexName, type, null, null, json);
-    }
-
-    public void add(String indexName, String type, String id, String json) {
-        add(indexName, type, id, null, json);
     }
 
-    public void add(String indexName, String type, String id, String ts, String json)
-    {
-        IndexRequestBuilder indexRequestBuilder = new IndexRequestBuilder(manager.getClient());
+    public void add(String indexName, String type, String id, String ts, String json) {
 
-        indexRequestBuilder.setIndex(indexName);
-        indexRequestBuilder.setType(type);
+        // make sure that these are not null
+        Preconditions.checkNotNull(indexName);
+        Preconditions.checkNotNull(type);
+        Preconditions.checkNotNull(json);
 
-        indexRequestBuilder.setSource(json);
+        IndexRequestBuilder indexRequestBuilder = manager.getClient()
+                .prepareIndex(indexName, type)
+                .setSource(json);
 
         // / They didn't specify an ID, so we will create one for them.
         if(id != null)
@@ -417,114 +313,109 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter,
Flushab
         if(ts != null)
             indexRequestBuilder.setTimestamp(ts);
 
-        // If there is a parentID that is associated with this bulk, then we are
-        // going to have to parse the raw JSON and attempt to dereference
-        // what the parent document should be
-        if (parentID != null) {
-            try {
-                // The JSONObject constructor can throw an exception, it is called
-                // out explicitly here so we can catch it.
-                indexRequestBuilder.setParent(new JSONObject(json).getString(parentID));
-            }
-            catch(JSONException e)
-            {
-                LOGGER.warn("Malformed JSON, cannot grab parentID: {}@{}[{}]: {}", id, indexName,
type, e.getMessage());
-                totalFailed++;
-            }
-        }
         add(indexRequestBuilder.request());
     }
 
-    public void add(UpdateRequest updateRequest) {
-        Preconditions.checkNotNull(updateRequest);
-        lock.writeLock().lock();
+    /**
+     *  This function is trashed... needs to be fixed.
+     *
+    private synchronized void add(UpdateRequest request) {
+        Preconditions.checkNotNull(request);
         checkAndCreateBulkRequest();
-        checkIndexImplications(updateRequest.index());
-        bulkRequest.add(updateRequest);
+
+        checkIndexImplications(request.index());
+
+        bulkRequest.add(request);
         try {
             Optional<Integer> size = Objects.firstNonNull(
-                    Optional.fromNullable(updateRequest.doc().source().length()),
-                    Optional.fromNullable(updateRequest.script().length()));
+                    Optional.fromNullable(request.doc().source().length()),
+                    Optional.fromNullable(request.script().length()));
             trackItemAndBytesWritten(size.get().longValue());
         } catch (NullPointerException x) {
             trackItemAndBytesWritten(1000);
-        } finally {
-            lock.writeLock().unlock();
         }
     }
+    */
 
-    public void add(IndexRequest indexRequest) {
-        lock.writeLock().lock();
-        checkAndCreateBulkRequest();
-        checkIndexImplications(indexRequest.index());
-        bulkRequest.add(indexRequest);
-        try {
-            trackItemAndBytesWritten(indexRequest.source().length());
-        } catch (NullPointerException x) {
-            LOGGER.warn("NPE adding/sizing indexrequest");
-        } finally {
-            lock.writeLock().unlock();
+    protected void add(IndexRequest request) {
+
+        Preconditions.checkNotNull(request);
+        Preconditions.checkNotNull(request.index());
+
+        // If our queue is larger than our flush threshold, then we should flush the queue.
+        synchronized (this) {
+            checkIndexImplications(request.index());
+
+            bulkRequest.add(request);
+
+            this.currentBatchBytes.addAndGet(request.source().length());
+            this.currentBatchItems.incrementAndGet();
+
+            checkForFlush();
         }
     }
 
-    private void trackItemAndBytesWritten(long sizeInBytes)
-    {
-        currentItems++;
-        batchItemsSent++;
-        batchSizeInBytes += sizeInBytes;
-
-        // If our queue is larger than our flush threashold, then we should flush the queue.
-        if( (batchSizeInBytes > flushThresholdSizeInBytes) ||
-                (currentItems >= batchSize) ) {
-            flushInternal();
-            this.currentItems = 0;
+    private void checkForFlush() {
+        synchronized (this) {
+            if (this.currentBatchBytes.get() >= this.flushThresholdBytes ||
+                    this.currentBatchItems.get() >= this.flushThresholdsRecords ||
+                    new Date().getTime() - this.lastFlush >= this.flushThresholdTime)
{
+                // We should flush
+                flushInternal();
+            }
         }
     }
 
-    private void checkIndexImplications(String indexName)
-    {
+    private void checkIndexImplications(String indexName) {
+        // We need this to be safe across all writers that are currently being executed
+        synchronized (ElasticsearchPersistWriter.class) {
 
-        // check to see if we have seen this index before.
-        if(this.affectedIndexes.contains(indexName))
-            return;
+            // this will be common if we have already verified the index.
+            if (this.affectedIndexes.contains(indexName))
+                return;
 
-        // we haven't log this index.
-        this.affectedIndexes.add(indexName);
 
-        // Check to see if we are in 'veryLargeBulk' mode
-        // if we aren't, exit early
-        if(!this.veryLargeBulk)
-            return;
+            // create the index if it is missing
+            createIndexIfMissing(indexName);
 
+            // we haven't log this index.
+            this.affectedIndexes.add(indexName);
 
-        // They are in 'very large bulk' mode we want to turn off refreshing the index.
+            // Check to see if we are in 'veryLargeBulk' mode
+            // if we aren't, exit early
+            if (this.veryLargeBulk) {
 
-        // Create a request then add the setting to tell it to stop refreshing the interval
-        UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
-        updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval",
-1));
+                // They are in 'very large bulk' mode we want to turn off refreshing the
index.
+                // Create a request then add the setting to tell it to stop refreshing the
interval
+                UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
+                updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval",
-1));
 
-        // submit to ElasticSearch
-        this.manager.getClient()
-                .admin()
-                .indices()
-                .updateSettings(updateSettingsRequest)
-                .actionGet();
+                // submit to ElasticSearch
+                this.manager.getClient()
+                        .admin()
+                        .indices()
+                        .updateSettings(updateSettingsRequest)
+                        .actionGet();
+            }
+        }
     }
 
     public void createIndexIfMissing(String indexName) {
+        // Synchronize this on a static class level
         if (!this.manager.getClient()
                 .admin()
                 .indices()
                 .exists(new IndicesExistsRequest(indexName))
                 .actionGet()
-                .isExists()) {
+                .isExists())
+        {
             // It does not exist... So we are going to need to create the index.
             // we are going to assume that the 'templates' that we have loaded into
             // elasticsearch are sufficient to ensure the index is being created properly.
             CreateIndexResponse response = this.manager.getClient().admin().indices().create(new
CreateIndexRequest(indexName)).actionGet();
 
             if (response.isAcknowledged()) {
-                LOGGER.info("Index {} did not exist. The index was automatically created
from the stored ElasticSearch Templates.", indexName);
+                LOGGER.info("Index Created: {}", indexName);
             } else {
                 LOGGER.error("Index {} did not exist. While attempting to create the index
from stored ElasticSearch Templates we were unable to get an acknowledgement.", indexName);
                 LOGGER.error("Error Message: {}", response.toString());
@@ -533,22 +424,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter,
Flushab
         }
     }
 
-    public void add(String indexName, String type, Map<String, Object> toImport) {
-        for (String id : toImport.keySet())
-            add(indexName, type, id, (String) toImport.get(id));
-    }
-
-    private void checkThenAddBatch(String index, String type, Map<String, String> workingBatch)
{
-        Set<String> invalidIDs = checkIds(workingBatch.keySet(), index, type);
-
-        for (String toAddId : workingBatch.keySet())
-            if (!invalidIDs.contains(toAddId))
-                add(index, type, toAddId, workingBatch.get(toAddId));
-
-        LOGGER.info("Adding Batch: {} -> {}", workingBatch.size(), invalidIDs.size());
-    }
-
-
+    /**
+     *
     private Set<String> checkIds(Set<String> input, String index, String type)
{
 
         IdsQueryBuilder idsFilterBuilder = new IdsQueryBuilder();
@@ -575,102 +452,90 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter,
Flushab
 
         return toReturn;
     }
+    */
 
-    @Override
     public void prepare(Object configurationObject) {
-        mapper = StreamsJacksonMapper.getInstance();
-        veryLargeBulk = config.getBulk() == null ? Boolean.FALSE : config.getBulk();
-        batchSize = config.getBatchSize() == null ? DEFAULT_BATCH_SIZE : (int)(config.getBatchSize().longValue());
-        maxTimeBetweenFlushMs = config.getMaxTimeBetweenFlushMs() == null ? DEFAULT_MAX_WAIT
: config.getMaxTimeBetweenFlushMs().longValue();
-        start();
-    }
+        this.veryLargeBulk = config.getBulk() == null ?
+                Boolean.FALSE :
+                config.getBulk();
 
-    /**
-     * This method is to ONLY be called by flushInternal otherwise the counts will be off.
-     * @param bulkRequest
-     * @param thisSent
-     * @param thisSizeInBytes
-     */
-    private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final
Long thisSizeInBytes) {
-        LOGGER.debug("Attempting to write {} items to ES", thisSent);
-        final ListenableActionFuture<BulkResponse> responseFuture = bulkRequest.execute();
-        this.addResponseFuture(responseFuture);
-        responseFuture.addListener(new ActionListener<BulkResponse>() {
-            @Override
-            public void onResponse(BulkResponse bulkItemResponses) {
-                lastWrite.set(System.currentTimeMillis());
-                removeResponseFuture(responseFuture);
-
-                updateTotals(bulkItemResponses, thisSent, thisSizeInBytes);
-            }
+        this.flushThresholdsRecords = config.getBatchSize() == null ?
+                DEFAULT_BATCH_SIZE :
+                (int)(config.getBatchSize().longValue());
 
-            @Override
-            public void onFailure(Throwable e) {
-                LOGGER.error("Error bulk loading: {}", e.getMessage());
-                removeResponseFuture(responseFuture);
-                e.printStackTrace();
-            }
-        });
-    }
+        this.flushThresholdTime = config.getMaxTimeBetweenFlushMs() != null && config.getMaxTimeBetweenFlushMs()
> 0 ?
+                config.getMaxTimeBetweenFlushMs() :
+                DEFAULT_MAX_WAIT;
 
-    private void updateTotals(BulkResponse bulkItemResponses, Integer thisSent, double thisSizeInBytes)
{
-        if (bulkItemResponses.hasFailures())
-            LOGGER.warn("Bulk Uploading had totalFailed: " + bulkItemResponses.buildFailureMessage());
+        this.flushThresholdBytes = config.getBatchBytes() == null ?
+                DEFAULT_BULK_FLUSH_THRESHOLD :
+                config.getBatchBytes();
 
-        long thisFailed = 0;
-        long thisOk = 0;
-        long thisMillis = bulkItemResponses.getTookInMillis();
+        timer.scheduleAtFixedRate(new TimerTask() {
+            public void run() {
+                checkForFlush();
+            }
+        }, this.flushThresholdTime, this.flushThresholdTime);
 
-        // keep track of the number of totalFailed and items that we have totalOk.
-        for (BulkItemResponse resp : bulkItemResponses.getItems()) {
-            if (resp.isFailed())
-                thisFailed++;
-            else
-                thisOk++;
-        }
+    }
 
-        synchronized(countLock) {
-            totalAttempted += thisSent;
-            totalOk += thisOk;
-            totalFailed += thisFailed;
-            totalSeconds += (thisMillis / 1000);
-            lock.writeLock().unlock();
-        }
+    private void flush(final BulkRequestBuilder bulkRequest, final Long sent, final Long
sizeInBytes) {
+        LOGGER.debug("Writing to ElasticSearch: Items[{}] Size[{} mb]", sent, MEGABYTE_FORMAT.format(sizeInBytes
/ (double) (1024 * 1024)));
 
-        if (thisSent != (thisOk + thisFailed))
-            LOGGER.error("We sent more items than this");
 
-        LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items
with {} failures in {}seconds] {} outstanding]",
-                MEGABYTE_FORMAT.format(thisSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk),
NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
-                MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024 * 1024)),
NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds),
NUMBER_FORMAT.format(getTotalOutstanding()));
+        // record the last time we flushed the index
+        this.lastFlush = new Date().getTime();
 
-    }
+        // add the totals
+        this.totalSent.addAndGet(sent);
 
+        // add the total number of batches sent
+        this.batchesSent.incrementAndGet();
 
-    private void checkAndCreateBulkRequest() {
-        // Synchronize to ensure that we don't lose any records
-        lock.writeLock().lock();
         try {
-            if (bulkRequest == null)
-                bulkRequest = this.manager.getClient().prepareBulk();
-        } finally {
-            lock.writeLock().unlock();
-        }
-    }
+            bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
+                public void onResponse(BulkResponse bulkItemResponses) {
+                    batchesResponded.incrementAndGet();
+                    updateTotals(bulkItemResponses, sent, sizeInBytes);
+                }
 
-    //Locking on a separate object than the writer as these objects are intended to be handled
separately
-    private void addResponseFuture(ListenableActionFuture<BulkResponse> future) {
-        synchronized (requestLock) {
-            this.responses.add(future);
+                public void onFailure(Throwable throwable) {
+                    batchesResponded.incrementAndGet();
+                    throwable.printStackTrace();
+                }
+            });
+        }
+        catch(Throwable e) {
+            LOGGER.error("There was an error sending the batch: {}", e.getMessage());
         }
     }
 
-    //Locking on a separate object than the writer as these objects are intended to be handled
separately
-    private void removeResponseFuture(ListenableActionFuture<BulkResponse> future)
{
-        synchronized(requestLock) {
-            this.responses.remove(future);
+    private void updateTotals(final BulkResponse bulkItemResponses, final Long sent, final
Long sizeInBytes) {
+        long failed = 0;
+        long passed = 0;
+        long millis = bulkItemResponses.getTookInMillis();
+
+        // keep track of the number of totalFailed and items that we have totalOk.
+        for (BulkItemResponse resp : bulkItemResponses.getItems()) {
+            if (resp == null || resp.isFailed())
+                failed++;
+            else
+                passed++;
         }
-    }
 
-}
+        if (failed > 0)
+            LOGGER.warn("Bulk Uploading had {} failures of {}", failed, sent);
+
+        this.totalOk.addAndGet(passed);
+        this.totalFailed.addAndGet(failed);
+        this.totalSeconds.addAndGet(millis / 1000);
+        this.totalSizeInBytes.addAndGet(sizeInBytes);
 
+        if (sent != (passed + failed))
+            LOGGER.error("Count MisMatch: Sent[{}] Passed[{}] Failed[{}]", sent, passed,
failed);
+
+        LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items
with {} failures in {}seconds] {} outstanding]",
+                MEGABYTE_FORMAT.format(sizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(passed),
NUMBER_FORMAT.format(failed), NUMBER_FORMAT.format(millis),
+                MEGABYTE_FORMAT.format((double) totalSizeInBytes.get() / (double) (1024 *
1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds),
NUMBER_FORMAT.format(getTotalOutstanding()));
+    }
+}


Mime
View raw message