streams-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mfranklin <...@git.apache.org>
Subject [GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...
Date Fri, 27 Jun 2014 13:07:35 GMT
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14291458
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
---
    @@ -575,102 +452,90 @@ private void checkThenAddBatch(String index, String type, Map<String,
String> wo
     
             return toReturn;
         }
    +    */
     
    -    @Override
         public void prepare(Object configurationObject) {
    -        mapper = StreamsJacksonMapper.getInstance();
    -        veryLargeBulk = config.getBulk() == null ? Boolean.FALSE : config.getBulk();
    -        batchSize = config.getBatchSize() == null ? DEFAULT_BATCH_SIZE : (int)(config.getBatchSize().longValue());
    -        maxTimeBetweenFlushMs = config.getMaxTimeBetweenFlushMs() == null ? DEFAULT_MAX_WAIT
: config.getMaxTimeBetweenFlushMs().longValue();
    -        start();
    -    }
    +        this.veryLargeBulk = config.getBulk() == null ?
    +                Boolean.FALSE :
    +                config.getBulk();
     
    -    /**
    -     * This method is to ONLY be called by flushInternal otherwise the counts will be
off.
    -     * @param bulkRequest
    -     * @param thisSent
    -     * @param thisSizeInBytes
    -     */
    -    private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent,
final Long thisSizeInBytes) {
    -        LOGGER.debug("Attempting to write {} items to ES", thisSent);
    -        final ListenableActionFuture<BulkResponse> responseFuture = bulkRequest.execute();
    -        this.addResponseFuture(responseFuture);
    -        responseFuture.addListener(new ActionListener<BulkResponse>() {
    -            @Override
    -            public void onResponse(BulkResponse bulkItemResponses) {
    -                lastWrite.set(System.currentTimeMillis());
    -                removeResponseFuture(responseFuture);
    -
    -                updateTotals(bulkItemResponses, thisSent, thisSizeInBytes);
    -            }
    +        this.flushThresholdsRecords = config.getBatchSize() == null ?
    +                DEFAULT_BATCH_SIZE :
    +                (int)(config.getBatchSize().longValue());
     
    -            @Override
    -            public void onFailure(Throwable e) {
    -                LOGGER.error("Error bulk loading: {}", e.getMessage());
    -                removeResponseFuture(responseFuture);
    -                e.printStackTrace();
    -            }
    -        });
    -    }
    +        this.flushThresholdTime = config.getMaxTimeBetweenFlushMs() != null &&
config.getMaxTimeBetweenFlushMs() > 0 ?
    +                config.getMaxTimeBetweenFlushMs() :
    +                DEFAULT_MAX_WAIT;
     
    -    private void updateTotals(BulkResponse bulkItemResponses, Integer thisSent, double
thisSizeInBytes) {
    -        if (bulkItemResponses.hasFailures())
    -            LOGGER.warn("Bulk Uploading had totalFailed: " + bulkItemResponses.buildFailureMessage());
    +        this.flushThresholdBytes = config.getBatchBytes() == null ?
    +                DEFAULT_BULK_FLUSH_THRESHOLD :
    +                config.getBatchBytes();
     
    -        long thisFailed = 0;
    -        long thisOk = 0;
    -        long thisMillis = bulkItemResponses.getTookInMillis();
    +        timer.scheduleAtFixedRate(new TimerTask() {
    +            public void run() {
    +                checkForFlush();
    +            }
    +        }, this.flushThresholdTime, this.flushThresholdTime);
     
    -        // keep track of the number of totalFailed and items that we have totalOk.
    -        for (BulkItemResponse resp : bulkItemResponses.getItems()) {
    -            if (resp.isFailed())
    -                thisFailed++;
    -            else
    -                thisOk++;
    -        }
    +    }
     
    -        synchronized(countLock) {
    -            totalAttempted += thisSent;
    -            totalOk += thisOk;
    -            totalFailed += thisFailed;
    -            totalSeconds += (thisMillis / 1000);
    -            lock.writeLock().unlock();
    -        }
    +    private void flush(final BulkRequestBuilder bulkRequest, final Long sent, final Long
sizeInBytes) {
    +        LOGGER.debug("Writing to ElasticSearch: Items[{}] Size[{} mb]", sent, MEGABYTE_FORMAT.format(sizeInBytes
/ (double) (1024 * 1024)));
     
    -        if (thisSent != (thisOk + thisFailed))
    -            LOGGER.error("We sent more items than this");
     
    -        LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items
with {} failures in {}seconds] {} outstanding]",
    -                MEGABYTE_FORMAT.format(thisSizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(thisOk),
NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
    -                MEGABYTE_FORMAT.format((double) totalSizeInBytes / (double) (1024 * 1024)),
NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds),
NUMBER_FORMAT.format(getTotalOutstanding()));
    +        // record the last time we flushed the index
    +        this.lastFlush = new Date().getTime();
     
    -    }
    +        // add the totals
    +        this.totalSent.addAndGet(sent);
     
    +        // add the total number of batches sent
    +        this.batchesSent.incrementAndGet();
     
    -    private void checkAndCreateBulkRequest() {
    -        // Synchronize to ensure that we don't lose any records
    -        lock.writeLock().lock();
             try {
    -            if (bulkRequest == null)
    -                bulkRequest = this.manager.getClient().prepareBulk();
    -        } finally {
    -            lock.writeLock().unlock();
    -        }
    -    }
    +            bulkRequest.execute().addListener(new ActionListener<BulkResponse>()
{
    +                public void onResponse(BulkResponse bulkItemResponses) {
    +                    batchesResponded.incrementAndGet();
    +                    updateTotals(bulkItemResponses, sent, sizeInBytes);
    +                }
     
    -    //Locking on a separate object than the writer as these objects are intended to be
handled separately
    -    private void addResponseFuture(ListenableActionFuture<BulkResponse> future)
{
    -        synchronized (requestLock) {
    -            this.responses.add(future);
    +                public void onFailure(Throwable throwable) {
    +                    batchesResponded.incrementAndGet();
    +                    throwable.printStackTrace();
    --- End diff --
    
    log instead or print


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message