Return-Path: X-Original-To: apmail-streams-dev-archive@minotaur.apache.org Delivered-To: apmail-streams-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CE42A11D2F for ; Fri, 27 Jun 2014 13:07:57 +0000 (UTC) Received: (qmail 29707 invoked by uid 500); 27 Jun 2014 13:07:57 -0000 Delivered-To: apmail-streams-dev-archive@streams.apache.org Received: (qmail 29664 invoked by uid 500); 27 Jun 2014 13:07:57 -0000 Mailing-List: contact dev-help@streams.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@streams.incubator.apache.org Delivered-To: mailing list dev@streams.incubator.apache.org Received: (qmail 29653 invoked by uid 99); 27 Jun 2014 13:07:57 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Jun 2014 13:07:57 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 27 Jun 2014 13:07:55 +0000 Received: (qmail 28676 invoked by uid 99); 27 Jun 2014 13:07:35 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Jun 2014 13:07:35 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 87D2098DA81; Fri, 27 Jun 2014 13:07:35 +0000 (UTC) From: mfranklin To: dev@streams.incubator.apache.org Reply-To: dev@streams.incubator.apache.org References: In-Reply-To: Subject: [GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit... Content-Type: text/plain Message-Id: <20140627130735.87D2098DA81@tyr.zones.apache.org> Date: Fri, 27 Jun 2014 13:07:35 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Github user mfranklin commented on a diff in the pull request: https://github.com/apache/incubator-streams/pull/45#discussion_r14291458 --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java --- @@ -575,102 +452,90 @@ private void checkThenAddBatch(String index, String type, Map wo return toReturn; } + */ - @Override public void prepare(Object configurationObject) { - mapper = StreamsJacksonMapper.getInstance(); - veryLargeBulk = config.getBulk() == null ? Boolean.FALSE : config.getBulk(); - batchSize = config.getBatchSize() == null ? DEFAULT_BATCH_SIZE : (int)(config.getBatchSize().longValue()); - maxTimeBetweenFlushMs = config.getMaxTimeBetweenFlushMs() == null ? DEFAULT_MAX_WAIT : config.getMaxTimeBetweenFlushMs().longValue(); - start(); - } + this.veryLargeBulk = config.getBulk() == null ? + Boolean.FALSE : + config.getBulk(); - /** - * This method is to ONLY be called by flushInternal otherwise the counts will be off. - * @param bulkRequest - * @param thisSent - * @param thisSizeInBytes - */ - private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final Long thisSizeInBytes) { - LOGGER.debug("Attempting to write {} items to ES", thisSent); - final ListenableActionFuture responseFuture = bulkRequest.execute(); - this.addResponseFuture(responseFuture); - responseFuture.addListener(new ActionListener() { - @Override - public void onResponse(BulkResponse bulkItemResponses) { - lastWrite.set(System.currentTimeMillis()); - removeResponseFuture(responseFuture); - - updateTotals(bulkItemResponses, thisSent, thisSizeInBytes); - } + this.flushThresholdsRecords = config.getBatchSize() == null ? + DEFAULT_BATCH_SIZE : + (int)(config.getBatchSize().longValue()); - @Override - public void onFailure(Throwable e) { - LOGGER.error("Error bulk loading: {}", e.getMessage()); - removeResponseFuture(responseFuture); - e.printStackTrace(); - } - }); - } + this.flushThresholdTime = config.getMaxTimeBetweenFlushMs() != null && config.getMaxTimeBetweenFlushMs() > 0 ? + config.getMaxTimeBetweenFlushMs() : + DEFAULT_MAX_WAIT; - private void updateTotals(BulkResponse bulkItemResponses, Integer thisSent, double thisSizeInBytes) { - if (bulkItemResponses.hasFailures()) - LOGGER.warn("Bulk Uploading had totalFailed: " + bulkItemResponses.buildFailureMessage()); + this.flushThresholdBytes = config.getBatchBytes() == null ? + DEFAULT_BULK_FLUSH_THRESHOLD : + config.getBatchBytes(); - long thisFailed = 0; - long thisOk = 0; - long thisMillis = bulkItemResponses.getTookInMillis(); + timer.scheduleAtFixedRate(new TimerTask() { + public void run() { + checkForFlush(); + } + }, this.flushThresholdTime, this.flushThresholdTime); - // keep track of the number of totalFailed and items that we have totalOk. - for (BulkItemResponse resp : bulkItemResponses.getItems()) { - if (resp.isFailed()) - thisFailed++; - else - thisOk++; - } + } - synchronized(countLock) { - totalAttempted += thisSent; - totalOk += thisOk; - totalFailed += thisFailed; - totalSeconds += (thisMillis / 1000); - lock.writeLock().unlock(); - } + private void flush(final BulkRequestBuilder bulkRequest, final Long sent, final Long sizeInBytes) { + LOGGER.debug("Writing to ElasticSearch: Items[{}] Size[{} mb]", sent, MEGABYTE_FORMAT.format(sizeInBytes / (double) (1024 * 1024))); - if (thisSent != (thisOk + thisFailed)) - LOGGER.error("We sent more items than this"); - LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]", - MEGABYTE_FORMAT.format(thisSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk), NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis), - MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding())); + // record the last time we flushed the index + this.lastFlush = new Date().getTime(); - } + // add the totals + this.totalSent.addAndGet(sent); + // add the total number of batches sent + this.batchesSent.incrementAndGet(); - private void checkAndCreateBulkRequest() { - // Synchronize to ensure that we don't lose any records - lock.writeLock().lock(); try { - if (bulkRequest == null) - bulkRequest = this.manager.getClient().prepareBulk(); - } finally { - lock.writeLock().unlock(); - } - } + bulkRequest.execute().addListener(new ActionListener() { + public void onResponse(BulkResponse bulkItemResponses) { + batchesResponded.incrementAndGet(); + updateTotals(bulkItemResponses, sent, sizeInBytes); + } - //Locking on a separate object than the writer as these objects are intended to be handled separately - private void addResponseFuture(ListenableActionFuture future) { - synchronized (requestLock) { - this.responses.add(future); + public void onFailure(Throwable throwable) { + batchesResponded.incrementAndGet(); + throwable.printStackTrace(); --- End diff -- log instead or print --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---