streams-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From smashew <...@git.apache.org>
Subject [GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...
Date Fri, 27 Jun 2014 14:54:43 GMT
Github user smashew commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14296411
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
---
    @@ -417,114 +313,109 @@ public void add(String indexName, String type, String id, String
ts, String json
             if(ts != null)
                 indexRequestBuilder.setTimestamp(ts);
     
    -        // If there is a parentID that is associated with this bulk, then we are
    -        // going to have to parse the raw JSON and attempt to dereference
    -        // what the parent document should be
    -        if (parentID != null) {
    -            try {
    -                // The JSONObject constructor can throw an exception, it is called
    -                // out explicitly here so we can catch it.
    -                indexRequestBuilder.setParent(new JSONObject(json).getString(parentID));
    -            }
    -            catch(JSONException e)
    -            {
    -                LOGGER.warn("Malformed JSON, cannot grab parentID: {}@{}[{}]: {}", id,
indexName, type, e.getMessage());
    -                totalFailed++;
    -            }
    -        }
             add(indexRequestBuilder.request());
         }
     
    -    public void add(UpdateRequest updateRequest) {
    -        Preconditions.checkNotNull(updateRequest);
    -        lock.writeLock().lock();
    +    /**
    +     *  This function is trashed... needs to be fixed.
    +     *
    +    private synchronized void add(UpdateRequest request) {
    +        Preconditions.checkNotNull(request);
             checkAndCreateBulkRequest();
    -        checkIndexImplications(updateRequest.index());
    -        bulkRequest.add(updateRequest);
    +
    +        checkIndexImplications(request.index());
    +
    +        bulkRequest.add(request);
             try {
                 Optional<Integer> size = Objects.firstNonNull(
    -                    Optional.fromNullable(updateRequest.doc().source().length()),
    -                    Optional.fromNullable(updateRequest.script().length()));
    +                    Optional.fromNullable(request.doc().source().length()),
    +                    Optional.fromNullable(request.script().length()));
                 trackItemAndBytesWritten(size.get().longValue());
             } catch (NullPointerException x) {
                 trackItemAndBytesWritten(1000);
    -        } finally {
    -            lock.writeLock().unlock();
             }
         }
    +    */
     
    -    public void add(IndexRequest indexRequest) {
    -        lock.writeLock().lock();
    -        checkAndCreateBulkRequest();
    -        checkIndexImplications(indexRequest.index());
    -        bulkRequest.add(indexRequest);
    -        try {
    -            trackItemAndBytesWritten(indexRequest.source().length());
    -        } catch (NullPointerException x) {
    -            LOGGER.warn("NPE adding/sizing indexrequest");
    -        } finally {
    -            lock.writeLock().unlock();
    +    protected void add(IndexRequest request) {
    +
    +        Preconditions.checkNotNull(request);
    +        Preconditions.checkNotNull(request.index());
    +
    +        // If our queue is larger than our flush threshold, then we should flush the
queue.
    +        synchronized (this) {
    +            checkIndexImplications(request.index());
    +
    +            bulkRequest.add(request);
    +
    +            this.currentBatchBytes.addAndGet(request.source().length());
    +            this.currentBatchItems.incrementAndGet();
    +
    +            checkForFlush();
             }
         }
     
    -    private void trackItemAndBytesWritten(long sizeInBytes)
    -    {
    -        currentItems++;
    -        batchItemsSent++;
    -        batchSizeInBytes += sizeInBytes;
    -
    -        // If our queue is larger than our flush threashold, then we should flush the
queue.
    -        if( (batchSizeInBytes > flushThresholdSizeInBytes) ||
    -                (currentItems >= batchSize) ) {
    -            flushInternal();
    -            this.currentItems = 0;
    +    private void checkForFlush() {
    +        synchronized (this) {
    +            if (this.currentBatchBytes.get() >= this.flushThresholdBytes ||
    +                    this.currentBatchItems.get() >= this.flushThresholdsRecords ||
    +                    new Date().getTime() - this.lastFlush >= this.flushThresholdTime)
{
    +                // We should flush
    +                flushInternal();
    +            }
             }
         }
     
    -    private void checkIndexImplications(String indexName)
    -    {
    +    private void checkIndexImplications(String indexName) {
    +        // We need this to be safe across all writers that are currently being executed
    +        synchronized (ElasticsearchPersistWriter.class) {
    --- End diff --
    
    too not run too many check index requests simultaneously.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message