Repository: incubator-streams
Updated Branches:
refs/heads/master ca5cf347c -> 8d9986af7
updater / deleter implementation
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/0be789cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/0be789cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/0be789cf
Branch: refs/heads/master
Commit: 0be789cfdb23652f9f07e394b885598887f91ee7
Parents: c2d59a2
Author: sblackmon <sblackmon@w2odigital.com>
Authored: Mon Aug 4 13:30:36 2014 -0500
Committer: sblackmon <sblackmon@w2odigital.com>
Committed: Mon Aug 4 13:30:36 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchPersistDeleter.java | 102 +++++
.../ElasticsearchPersistUpdater.java | 413 ++-----------------
.../ElasticsearchPersistWriter.java | 29 +-
3 files changed, 146 insertions(+), 398 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0be789cf/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
new file mode 100644
index 0000000..fece72e
--- /dev/null
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.elasticsearch;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter implements StreamsPersistWriter
{
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistDeleter.class);
+
+ public ElasticsearchPersistDeleter() {
+ super();
+ }
+
+ public ElasticsearchPersistDeleter(ElasticsearchWriterConfiguration config) {
+ super(config);
+ }
+
+ @Override
+ public void write(StreamsDatum streamsDatum) {
+
+ Preconditions.checkNotNull(streamsDatum);
+ Preconditions.checkNotNull(streamsDatum.getDocument());
+ Preconditions.checkNotNull(streamsDatum.getMetadata());
+ Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
+
+ String index;
+ String type;
+ String id;
+
+ index = Optional.fromNullable(
+ (String) streamsDatum.getMetadata().get("index"))
+ .or(config.getIndex());
+ type = Optional.fromNullable(
+ (String) streamsDatum.getMetadata().get("type"))
+ .or(config.getType());
+ id = (String) streamsDatum.getMetadata().get("id");
+
+ delete(index, type, id);
+
+ }
+
+ public void delete(String index, String type, String id) {
+ DeleteRequest deleteRequest;
+
+ Preconditions.checkNotNull(index);
+ Preconditions.checkNotNull(id);
+ Preconditions.checkNotNull(type);
+
+ // They didn't specify an ID, so we will create one for them.
+ deleteRequest = new DeleteRequest()
+ .index(index)
+ .type(type)
+ .id(id);
+
+ add(deleteRequest);
+
+ }
+
+ public void add(DeleteRequest request) {
+
+ Preconditions.checkNotNull(request);
+ Preconditions.checkNotNull(request.index());
+
+ // If our queue is larger than our flush threshold, then we should flush the queue.
+ synchronized (this) {
+ checkIndexImplications(request.index());
+
+ bulkRequest.add(request);
+
+ currentBatchItems.incrementAndGet();
+
+ checkForFlush();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0be789cf/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index b2e7556..dbf7d25 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -56,96 +56,16 @@ import java.util.*;
//import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
-public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter,
Flushable, Closeable {
- private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistUpdater.class);
- private final static NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
- private final static NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
-
- protected ElasticsearchClientManager manager;
- protected Client client;
- private String parentID = null;
- protected BulkRequestBuilder bulkRequest;
- private OutputStreamWriter currentWriter = null;
-
- protected String index = null;
- protected String type = null;
- private int batchSize = 50;
- private int totalRecordsWritten = 0;
- private boolean veryLargeBulk = false; // by default this setting is set to false
-
- private final static Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
- private static final long WAITING_DOCS_LIMIT = 10000;
-
- public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
-
- private volatile int totalSent = 0;
- private volatile int totalSeconds = 0;
- private volatile int totalOk = 0;
- private volatile int totalFailed = 0;
- private volatile int totalBatchCount = 0;
- private volatile long totalSizeInBytes = 0;
-
- private volatile long batchSizeInBytes = 0;
- private volatile int batchItemsSent = 0;
-
- public void setIndex(String index) {
- this.index = index;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public void setBatchSize(int batchSize) {
- this.batchSize = batchSize;
- }
-
- private final List<String> affectedIndexes = new ArrayList<String>();
+public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter
{
- public long getFlushThresholdSizeInBytes() {
- return flushThresholdSizeInBytes;
- }
-
- public int getTotalBatchCount() {
- return totalBatchCount;
- }
-
- public long getBatchSizeInBytes() {
- return batchSizeInBytes;
- }
-
- public int getBatchItemsSent() {
- return batchItemsSent;
- }
-
- public void setFlushThresholdSizeInBytes(long sizeInBytes) {
- this.flushThresholdSizeInBytes = sizeInBytes;
- }
-
- Thread task;
-
- protected volatile Queue<StreamsDatum> persistQueue;
-
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
- private ElasticsearchConfiguration config;
+ private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistUpdater.class);
public ElasticsearchPersistUpdater() {
- Config config = StreamsConfigurator.config.getConfig("elasticsearch");
- this.config = ElasticsearchConfigurator.detectConfiguration(config);
+ super();
}
- public ElasticsearchPersistUpdater(ElasticsearchConfiguration config) {
- this.config = config;
- }
-
- private static final int BYTES_IN_MB = 1024 * 1024;
- private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
- private volatile int totalByteCount = 0;
- private volatile int byteCount = 0;
-
- public boolean isConnected() {
- return (client != null);
+ public ElasticsearchPersistUpdater(ElasticsearchWriterConfiguration config) {
+ super(config);
}
@Override
@@ -156,15 +76,23 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter
impl
Preconditions.checkNotNull(streamsDatum.getMetadata());
Preconditions.checkNotNull(streamsDatum.getMetadata().get("id"));
+ String index;
+ String type;
String id;
String json;
try {
- json = mapper.writeValueAsString(streamsDatum.getDocument());
+ json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
+ index = Optional.fromNullable(
+ (String) streamsDatum.getMetadata().get("index"))
+ .or(config.getIndex());
+ type = Optional.fromNullable(
+ (String) streamsDatum.getMetadata().get("type"))
+ .or(config.getType());
id = (String) streamsDatum.getMetadata().get("id");
- add(index, type, id, json);
+ update(index, type, id, json);
} catch (JsonProcessingException e) {
LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
@@ -172,184 +100,7 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter
impl
}
}
- public void start() {
-
- manager = new ElasticsearchClientManager(config);
- client = manager.getClient();
-
- LOGGER.info(client.toString());
- }
-
- public void cleanUp() {
-
- try {
- flush();
- } catch (IOException e) {
- e.printStackTrace();
- }
- close();
- }
-
- @Override
- public void close() {
- try {
- // before they close, check to ensure that
- this.flush();
-
- int count = 0;
- // We are going to give it 5 minutes.
- while (this.getTotalOutstanding() > 0 && count++ < 20 * 60 * 5)
- Thread.sleep(50);
-
- if (this.getTotalOutstanding() > 0) {
- LOGGER.error("We never cleared our buffer");
- }
-
-
- for (String indexName : this.getAffectedIndexes()) {
- createIndexIfMissing(indexName);
-
- if (this.veryLargeBulk) {
- LOGGER.debug("Resetting our Refresh Interval: {}", indexName);
- // They are in 'very large bulk' mode and the process is finished. We
now want to turn the
- // refreshing back on.
- UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
- updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval",
"5s"));
-
- // submit to ElasticSearch
- this.manager.getClient()
- .admin()
- .indices()
- .updateSettings(updateSettingsRequest)
- .actionGet();
- }
-
- checkIndexImplications(indexName);
-
- LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
- this.manager.getClient()
- .admin()
- .indices()
- .prepareRefresh(indexName)
- .execute()
- .actionGet();
- }
-
- LOGGER.info("Closed: Wrote[{} of {}] Failed[{}]", this.getTotalOk(), this.getTotalSent(),
this.getTotalFailed());
-
- } catch (Exception e) {
- // this line of code should be logically unreachable.
- LOGGER.warn("This is unexpected: {}", e.getMessage());
- e.printStackTrace();
- }
- }
-
- @Override
- public void flush() throws IOException {
- flushInternal();
- }
-
- public void flushInternal() {
- synchronized (this) {
- // we do not have a working bulk request, we can just exit here.
- if (this.bulkRequest == null || batchItemsSent == 0)
- return;
-
- // call the flush command.
- flush(this.bulkRequest, batchItemsSent, batchSizeInBytes);
-
- // null the flush request, this will be created in the 'add' function below
- this.bulkRequest = null;
-
- // record the proper statistics, and add it to our totals.
- this.totalSizeInBytes += this.batchSizeInBytes;
- this.totalSent += batchItemsSent;
-
- // reset the current batch statistics
- this.batchSizeInBytes = 0;
- this.batchItemsSent = 0;
-
- try {
- int count = 0;
- if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) {
- /****************************************************************************
- * Author:
- * Smashew
- *
- * Date:
- * 2013-10-20
- *
- * Note:
- * With the information that we have on hand. We need to develop a heuristic
- * that will determine when the cluster is having a problem indexing
records
- * by telling it to pause and wait for it to catch back up. A
- *
- * There is an impact to us, the caller, whenever this happens as well.
Items
- * that are not yet fully indexed by the server sit in a queue, on the
client
- * that can cause the heap to overflow. This has been seen when re-indexing
- * large amounts of data to a small cluster. The "deletes" + "indexes"
can
- * cause the server to have many 'outstandingItems" in queue. Running
this
- * software with large amounts of data, on a small cluster, can re-create
- * this problem.
- *
- * DO NOT DELETE THESE LINES
- ****************************************************************************/
-
- // wait for the flush to catch up. We are going to cap this at
- while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT &&
count++ < 500)
- Thread.sleep(10);
-
- if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT)
- LOGGER.warn("Even after back-off there are {} items still in queue.",
this.getTotalOutstanding());
- }
- } catch (Exception e) {
- LOGGER.info("We were broken from our loop: {}", e.getMessage());
- }
- }
- }
-
- private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final
Long thisSizeInBytes) {
- bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
- @Override
- public void onResponse(BulkResponse bulkItemResponses) {
- if (bulkItemResponses.hasFailures())
- LOGGER.warn("Bulk Uploading had totalFailed: " + bulkItemResponses.buildFailureMessage());
-
- long thisFailed = 0;
- long thisOk = 0;
- long thisMillis = bulkItemResponses.getTookInMillis();
-
- // keep track of the number of totalFailed and items that we have totalOk.
- for (BulkItemResponse resp : bulkItemResponses.getItems()) {
- if (resp.isFailed())
- thisFailed++;
- else
- thisOk++;
- }
-
- totalOk += thisOk;
- totalFailed += thisFailed;
- totalSeconds += (thisMillis / 1000);
-
- if (thisSent != (thisOk + thisFailed))
- LOGGER.error("We sent more items than this");
-
- LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb
{} items with {} failures in {}seconds] {} outstanding]",
- MEGABYTE_FORMAT.format((double) thisSizeInBytes / (double) (1024
* 1024)), NUMBER_FORMAT.format(thisOk), NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
- MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024
* 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds),
NUMBER_FORMAT.format(getTotalOutstanding()));
- }
-
- @Override
- public void onFailure(Throwable e) {
- LOGGER.error("Error bulk loading: {}", e.getMessage());
- e.printStackTrace();
- }
- });
-
- this.notify();
- }
-
- public void add(String indexName, String type, String id, String json) {
+ public void update(String indexName, String type, String id, String json) {
UpdateRequest updateRequest;
Preconditions.checkNotNull(id);
@@ -366,137 +117,23 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter
impl
}
- public void add(UpdateRequest updateRequest) {
- Preconditions.checkNotNull(updateRequest);
- synchronized (this) {
- checkAndCreateBulkRequest();
- checkIndexImplications(updateRequest.index());
- bulkRequest.add(updateRequest);
- try {
- Optional<Integer> size = Objects.firstNonNull(
- Optional.fromNullable(updateRequest.doc().source().length()),
- Optional.fromNullable(updateRequest.script().length()));
- trackItemAndBytesWritten(size.get().longValue());
- } catch (NullPointerException x) {
- trackItemAndBytesWritten(1000);
- }
- }
- }
+ public void add(UpdateRequest request) {
- private void trackItemAndBytesWritten(long sizeInBytes) {
- batchItemsSent++;
- batchSizeInBytes += sizeInBytes;
+ Preconditions.checkNotNull(request);
+ Preconditions.checkNotNull(request.index());
- // If our queue is larger than our flush threashold, then we should flush the queue.
- if (batchSizeInBytes > flushThresholdSizeInBytes)
- flushInternal();
- }
-
- private void checkAndCreateBulkRequest() {
- // Synchronize to ensure that we don't lose any records
+ // If our queue is larger than our flush threshold, then we should flush the queue.
synchronized (this) {
- if (bulkRequest == null)
- bulkRequest = this.manager.getClient().prepareBulk();
- }
- }
-
- private void checkIndexImplications(String indexName) {
-
- // check to see if we have seen this index before.
- if (this.affectedIndexes.contains(indexName))
- return;
-
- // we haven't log this index.
- this.affectedIndexes.add(indexName);
-
- // Check to see if we are in 'veryLargeBulk' mode
- // if we aren't, exit early
- if (!this.veryLargeBulk)
- return;
-
-
- // They are in 'very large bulk' mode we want to turn off refreshing the index.
-
- // Create a request then add the setting to tell it to stop refreshing the interval
- UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
- updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval",
-1));
-
- // submit to ElasticSearch
- this.manager.getClient()
- .admin()
- .indices()
- .updateSettings(updateSettingsRequest)
- .actionGet();
- }
-
- public void createIndexIfMissing(String indexName) {
- if (!this.manager.getClient()
- .admin()
- .indices()
- .exists(new IndicesExistsRequest(indexName))
- .actionGet()
- .isExists()) {
- // It does not exist... So we are going to need to create the index.
- // we are going to assume that the 'templates' that we have loaded into
- // elasticsearch are sufficient to ensure the index is being created properly.
- CreateIndexResponse response = this.manager.getClient().admin().indices().create(new
CreateIndexRequest(indexName)).actionGet();
-
- if (response.isAcknowledged()) {
- LOGGER.info("Index {} did not exist. The index was automatically created
from the stored ElasticSearch Templates.", indexName);
- } else {
- LOGGER.error("Index {} did not exist. While attempting to create the index
from stored ElasticSearch Templates we were unable to get an acknowledgement.", indexName);
- LOGGER.error("Error Message: {}", response.toString());
- throw new RuntimeException("Unable to create index " + indexName);
- }
- }
- }
+ checkIndexImplications(request.index());
- public void add(String indexName, String type, Map<String, Object> toImport) {
- for (String id : toImport.keySet())
- add(indexName, type, id, (String) toImport.get(id));
- }
-
- private void checkThenAddBatch(String index, String type, Map<String, String> workingBatch)
{
- Set<String> invalidIDs = checkIds(workingBatch.keySet(), index, type);
-
- for (String toAddId : workingBatch.keySet())
- if (!invalidIDs.contains(toAddId))
- add(index, type, toAddId, workingBatch.get(toAddId));
-
- LOGGER.info("Adding Batch: {} -> {}", workingBatch.size(), invalidIDs.size());
- }
-
-
- private Set<String> checkIds(Set<String> input, String index, String type)
{
+ bulkRequest.add(request);
- IdsQueryBuilder idsFilterBuilder = new IdsQueryBuilder();
+ currentBatchBytes.addAndGet(request.doc().source().length());
+ currentBatchItems.incrementAndGet();
- for (String s : input)
- idsFilterBuilder.addIds(s);
-
- SearchRequestBuilder searchRequestBuilder = this.manager.getClient()
- .prepareSearch(index)
- .setTypes(type)
- .setQuery(idsFilterBuilder)
- .addField("_id")
- .setSize(input.size());
-
- SearchHits hits = searchRequestBuilder.execute()
- .actionGet()
- .getHits();
-
- Set<String> toReturn = new HashSet<String>();
-
- for (SearchHit hit : hits) {
- toReturn.add(hit.getId());
+ checkForFlush();
}
- return toReturn;
- }
-
- @Override
- public void prepare(Object configurationObject) {
- start();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/0be789cf/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 05b0ef2..b83918d 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.TreeNode;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.*;
@@ -66,14 +67,14 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter,
DatumSt
//A document should have to wait no more than 10s to get flushed
private static final long DEFAULT_MAX_WAIT = 10000;
- private static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
+ protected static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
- private final List<String> affectedIndexes = new ArrayList<String>();
+ protected final List<String> affectedIndexes = new ArrayList<String>();
- private final ElasticsearchClientManager manager;
- private final ElasticsearchWriterConfiguration config;
+ protected final ElasticsearchClientManager manager;
+ protected final ElasticsearchWriterConfiguration config;
- private BulkRequestBuilder bulkRequest;
+ protected BulkRequestBuilder bulkRequest;
private boolean veryLargeBulk = false; // by default this setting is set to false
private long flushThresholdsRecords = DEFAULT_BATCH_SIZE;
@@ -87,8 +88,8 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter,
DatumSt
private final AtomicInteger batchesSent = new AtomicInteger(0);
private final AtomicInteger batchesResponded = new AtomicInteger(0);
- private final AtomicLong currentBatchItems = new AtomicLong(0);
- private final AtomicLong currentBatchBytes = new AtomicLong(0);
+ protected final AtomicLong currentBatchItems = new AtomicLong(0);
+ protected final AtomicLong currentBatchBytes = new AtomicLong(0);
private final AtomicLong totalSent = new AtomicLong(0);
private final AtomicLong totalSeconds = new AtomicLong(0);
@@ -142,8 +143,16 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter,
DatumSt
checkForBackOff();
+ String index = Optional.fromNullable(
+ (String) streamsDatum.getMetadata().get("index"))
+ .or(config.getIndex());
+ String type = Optional.fromNullable(
+ (String) streamsDatum.getMetadata().get("type"))
+ .or(config.getType());
+ String id = (String) streamsDatum.getMetadata().get("id");
+
try {
- add(config.getIndex(), config.getType(), streamsDatum.getId(),
+ add(index, type, id,
streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis())
: Long.toString(streamsDatum.getTimestamp().getMillis()),
convertAndAppendMetadata(streamsDatum));
} catch (Throwable e) {
@@ -333,7 +342,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter,
DatumSt
}
}
- private void checkForFlush() {
+ protected void checkForFlush() {
synchronized (this) {
if (this.currentBatchBytes.get() >= this.flushThresholdBytes ||
this.currentBatchItems.get() >= this.flushThresholdsRecords ||
@@ -344,7 +353,7 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter,
DatumSt
}
}
- private void checkIndexImplications(String indexName) {
+ protected void checkIndexImplications(String indexName) {
// We need this to be safe across all writers that are currently being executed
synchronized (ElasticsearchPersistWriter.class) {
|