Return-Path: X-Original-To: apmail-streams-commits-archive@minotaur.apache.org Delivered-To: apmail-streams-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7456A117DF for ; Thu, 15 May 2014 03:12:37 +0000 (UTC) Received: (qmail 724 invoked by uid 500); 10 May 2014 22:05:03 -0000 Delivered-To: apmail-streams-commits-archive@streams.apache.org Received: (qmail 98048 invoked by uid 500); 10 May 2014 22:04:53 -0000 Mailing-List: contact commits-help@streams.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@streams.incubator.apache.org Delivered-To: mailing list commits@streams.incubator.apache.org Received: (qmail 94964 invoked by uid 99); 10 May 2014 22:04:39 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 May 2014 22:04:39 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Sat, 10 May 2014 21:58:20 +0000 Received: (qmail 58255 invoked by uid 99); 10 May 2014 21:56:13 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 May 2014 21:56:13 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 54C7B940A3B; Thu, 8 May 2014 19:33:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sblackmon@apache.org To: commits@streams.incubator.apache.org Date: Thu, 08 May 2014 19:33:20 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [5/6] git commit: Merge remote-tracking branch 'origin/pr/7' into streamstutorial X-Virus-Checked: Checked by ClamAV on apache.org Merge remote-tracking branch 'origin/pr/7' into streamstutorial Conflicts: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/145ec847 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/145ec847 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/145ec847 Branch: refs/heads/streamstutorial Commit: 145ec8472c9cf12e8e376ce09596f354644e587b Parents: 7293594 f62afa5 Author: sblackmon Authored: Thu May 8 14:15:27 2014 -0500 Committer: sblackmon Committed: Thu May 8 14:15:27 2014 -0500 ---------------------------------------------------------------------- .../ElasticsearchConfigurator.java | 4 +- .../ElasticsearchPersistWriter.java | 273 +++++++++---------- .../ElasticsearchWriterConfiguration.json | 5 +- .../provider/SysomosHeartbeatStream.java | 45 +-- .../local/builders/LocalStreamBuilder.java | 161 +++++++---- .../local/tasks/StreamsPersistWriterTask.java | 6 +- .../local/tasks/StreamsProcessorTask.java | 2 +- 7 files changed, 271 insertions(+), 225 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/145ec847/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java ---------------------------------------------------------------------- diff --cc streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java index 9ac234c,8a343a6..e07e50f --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java @@@ -51,17 -51,12 +51,19 @@@ public class ElasticsearchConfigurator String index = elasticsearch.getString("index"); String type = elasticsearch.getString("type"); + Long maxMsBeforeFlush = elasticsearch.hasPath("MaxTimeBetweenFlushMs") ? elasticsearch.getLong("MaxTimeBetweenFlushMs") : null; + if( elasticsearch.hasPath("bulk")) + elasticsearchWriterConfiguration.setBulk(elasticsearch.getBoolean("bulk")); + + if( elasticsearch.hasPath("batchSize")) + elasticsearchWriterConfiguration.setBatchSize(elasticsearch.getLong("batchSize")); + elasticsearchWriterConfiguration.setIndex(index); elasticsearchWriterConfiguration.setType(type); + elasticsearchWriterConfiguration.setMaxTimeBetweenFlushMs(maxMsBeforeFlush); + return elasticsearchWriterConfiguration; } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/145ec847/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---------------------------------------------------------------------- diff --cc streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java index 80d2775,5756e1c..ce23197 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java @@@ -49,8 -54,11 +54,12 @@@ public class ElasticsearchPersistWrite private static final long WAITING_DOCS_LIMIT = 10000; private static final int BYTES_IN_MB = 1024 * 1024; private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB; + private static final long DEFAULT_MAX_WAIT = 10000; ++ private static final int DEFAULT_BATCH_SIZE = 100; private final List affectedIndexes = new ArrayList(); + private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor(); + private final ReadWriteLock lock = new ReentrantReadWriteLock(); private ObjectMapper mapper = new StreamsJacksonMapper(); private ElasticsearchClientManager manager; @@@ -59,11 -67,11 +68,10 @@@ private String parentID = null; private BulkRequestBuilder bulkRequest; private OutputStreamWriter currentWriter = null; - private int batchSize = 50; - private int totalRecordsWritten = 0; - private long maxMsBeforeFlush; ++ private int batchSize; ++ private long maxTimeBetweenFlushMs; + private boolean veryLargeBulk = false; // by default this setting is set to false - private long batchSize; - private boolean veryLargeBulk; // by default this setting is set to false - private int totalRecordsWritten = 0; - protected Thread task; protected volatile Queue persistQueue; @@@ -99,8 -106,7 +108,6 @@@ this.veryLargeBulk = veryLargeBulk; } - private final List affectedIndexes = new ArrayList(); -- public int getTotalOutstanding() { return this.totalSent - (this.totalFailed + this.totalOk); } @@@ -151,6 -155,6 +156,14 @@@ this.flushThresholdSizeInBytes = sizeInBytes; } ++ public long getMaxTimeBetweenFlushMs() { ++ return maxTimeBetweenFlushMs; ++ } ++ ++ public void setMaxTimeBetweenFlushMs(long maxTimeBetweenFlushMs) { ++ this.maxTimeBetweenFlushMs = maxTimeBetweenFlushMs; ++ } ++ public boolean isConnected() { return (client != null); } @@@ -252,12 -250,13 +259,6 @@@ } @Override -- public void prepare(Object configurationObject) { - mapper = StreamsJacksonMapper.getInstance(); - start(); - } - - @Override - maxMsBeforeFlush = config.getMaxTimeBetweenFlushMs() == null ? DEFAULT_MAX_WAIT : config.getMaxTimeBetweenFlushMs(); - mapper = StreamsJacksonMapper.getInstance(); - start(); - } - - @Override public DatumStatusCounter getDatumStatusCounter() { DatumStatusCounter counters = new DatumStatusCounter(); counters.incrementAttempt(this.batchItemsSent); @@@ -267,7 -266,17 +268,17 @@@ } public void start() { - + backgroundFlushTask.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + LOGGER.debug("Checking to see if data needs to be flushed"); + long time = System.currentTimeMillis() - lastWrite.get(); - if (time > maxMsBeforeFlush && batchItemsSent > 0) { ++ if (time > maxTimeBetweenFlushMs && batchItemsSent > 0) { + LOGGER.debug("Background Flush task determined {} are waiting to be flushed. It has been {} since the last write to ES", batchItemsSent, time); + flushInternal(); + } + } - }, 0, maxMsBeforeFlush * 2, TimeUnit.MILLISECONDS); ++ }, 0, maxTimeBetweenFlushMs * 2, TimeUnit.MILLISECONDS); manager = new ElasticsearchClientManager(config); client = manager.getClient(); @@@ -383,70 -394,20 +396,62 @@@ } public void add(IndexRequest indexRequest) { - synchronized (this) { - checkAndCreateBulkRequest(); - checkIndexImplications(indexRequest.index()); - bulkRequest.add(indexRequest); - try { - trackItemAndBytesWritten(indexRequest.source().length()); - } catch (NullPointerException x) { - LOGGER.warn("NPE adding/sizing indexrequest"); - } + lock.writeLock().lock(); + checkAndCreateBulkRequest(); + checkIndexImplications(indexRequest.index()); + bulkRequest.add(indexRequest); + try { + trackItemAndBytesWritten(indexRequest.source().length()); + } catch (NullPointerException x) { + LOGGER.warn("NPE adding/sizing indexrequest"); + } finally { + lock.writeLock().unlock(); } + } + private void trackItemAndBytesWritten(long sizeInBytes) + { + currentItems++; + batchItemsSent++; + batchSizeInBytes += sizeInBytes; + + // If our queue is larger than our flush threashold, then we should flush the queue. + if( (batchSizeInBytes > flushThresholdSizeInBytes) || + (currentItems >= batchSize) ) + flushInternal(); + } + - private void checkAndCreateBulkRequest() - { - // Synchronize to ensure that we don't lose any records - synchronized (this) - { - if(bulkRequest == null) - bulkRequest = this.manager.getClient().prepareBulk(); - } - } - + private void checkIndexImplications(String indexName) + { + + // check to see if we have seen this index before. + if(this.affectedIndexes.contains(indexName)) + return; + + // we haven't log this index. + this.affectedIndexes.add(indexName); + + // Check to see if we are in 'veryLargeBulk' mode + // if we aren't, exit early + if(!this.veryLargeBulk) + return; + + + // They are in 'very large bulk' mode we want to turn off refreshing the index. + + // Create a request then add the setting to tell it to stop refreshing the interval + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName); + updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1)); + + // submit to ElasticSearch + this.manager.getClient() + .admin() + .indices() + .updateSettings(updateSettingsRequest) + .actionGet(); + } + public void createIndexIfMissing(String indexName) { if (!this.manager.getClient() .admin() @@@ -512,15 -473,15 +517,24 @@@ return toReturn; } + @Override + public void prepare(Object configurationObject) { + mapper = StreamsJacksonMapper.getInstance(); - veryLargeBulk = this.config.getBulk(); - batchSize = this.config.getBatchSize(); ++ veryLargeBulk = config.getBulk() == null ? Boolean.FALSE : config.getBulk(); ++ batchSize = config.getBatchSize() == null ? DEFAULT_BATCH_SIZE : (int)(config.getBatchSize().longValue()); ++ maxTimeBetweenFlushMs = config.getMaxTimeBetweenFlushMs() == null ? DEFAULT_MAX_WAIT : config.getMaxTimeBetweenFlushMs().longValue(); + start(); + } + + /** + * This method is to ONLY be called by flushInternal otherwise the counts will be off. + * @param bulkRequest + * @param thisSent + * @param thisSizeInBytes + */ private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) { + final Object messenger = new Object(); + LOGGER.debug("Attempting to write {} items to ES", thisSent); bulkRequest.execute().addListener(new ActionListener() { @Override public void onResponse(BulkResponse bulkItemResponses) { @@@ -558,18 -521,16 +574,9 @@@ e.printStackTrace(); } }); - - this.notify(); } -- private void trackItemAndBytesWritten(long sizeInBytes) { -- batchItemsSent++; -- batchSizeInBytes += sizeInBytes; -- // If our queue is larger than our flush threashold, then we should flush the queue. -- if (batchSizeInBytes > flushThresholdSizeInBytes) -- flushInternal(); -- } private void checkAndCreateBulkRequest() { // Synchronize to ensure that we don't lose any records @@@ -579,32 -543,32 +589,5 @@@ } } -- private void checkIndexImplications(String indexName) { -- -- // check to see if we have seen this index before. -- if (this.affectedIndexes.contains(indexName)) -- return; -- -- // we haven't log this index. -- this.affectedIndexes.add(indexName); -- -- // Check to see if we are in 'veryLargeBulk' mode -- // if we aren't, exit early -- if (!this.veryLargeBulk) -- return; -- -- -- // They are in 'very large bulk' mode we want to turn off refreshing the index. -- -- // Create a request then add the setting to tell it to stop refreshing the interval -- UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName); -- updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval", -1)); -- -- // submit to ElasticSearch -- this.manager.getClient() -- .admin() -- .indices() -- .updateSettings(updateSettingsRequest) -- .actionGet(); -- } } ++ http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/145ec847/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json ---------------------------------------------------------------------- diff --cc streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json index 5e169b7,c56aa82..b107be6 --- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json +++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json @@@ -14,15 -14,9 +14,18 @@@ "type": "string", "description": "Type to write as" }, - "MaxTimeBetweenFlushMs": { - "type": "String", - "format": "utc-millisec" + "bulk": { + "type": "boolean", + "description": "Index in large or small batches", + "default": "false" + }, + "batchSize": { + "type": "integer", + "description": "Item Count before flush", + "default": 100 - } ++ }, ++ "maxTimeBetweenFlushMs": { ++ "type": "integer" + } } }