streams-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mfranklin <...@git.apache.org>
Subject [GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...
Date Fri, 27 Jun 2014 16:03:10 GMT
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14300390
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
---
    @@ -39,321 +37,227 @@
     import org.elasticsearch.action.bulk.BulkResponse;
     import org.elasticsearch.action.index.IndexRequest;
     import org.elasticsearch.action.index.IndexRequestBuilder;
    -import org.elasticsearch.action.search.SearchRequestBuilder;
    -import org.elasticsearch.action.update.UpdateRequest;
    -import org.elasticsearch.client.Client;
    +import org.elasticsearch.common.joda.time.DateTime;
     import org.elasticsearch.common.settings.ImmutableSettings;
    -import org.elasticsearch.index.query.IdsQueryBuilder;
    -import org.elasticsearch.search.SearchHit;
    -import org.elasticsearch.search.SearchHits;
    -import org.json.JSONException;
    -import org.json.JSONObject;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    +import com.fasterxml.jackson.core.JsonParser;
     
    -import java.io.Closeable;
    -import java.io.Flushable;
     import java.io.IOException;
    -import java.io.OutputStreamWriter;
    +import java.io.Serializable;
     import java.text.DecimalFormat;
     import java.text.NumberFormat;
     import java.util.*;
    -import java.util.concurrent.Executors;
    -import java.util.concurrent.ScheduledExecutorService;
    -import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
     import java.util.concurrent.atomic.AtomicLong;
    -import java.util.concurrent.locks.ReadWriteLock;
    -import java.util.concurrent.locks.ReentrantReadWriteLock;
     
    -public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable,
DatumStatusCountable {
    -    public static final String STREAMS_ID = "ElasticsearchPersistWriter";
    +public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumStatusCountable,
Serializable {
     
    -    public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
    +    public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName();
     
         private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
         private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
         private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
         private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
         private static final long WAITING_DOCS_LIMIT = 10000;
    -    private static final int BYTES_IN_MB = 1024 * 1024;
    -    private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
         private static final long DEFAULT_MAX_WAIT = 10000;
         private static final int DEFAULT_BATCH_SIZE = 100;
     
    +    private static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
    +
         private final List<String> affectedIndexes = new ArrayList<String>();
    -    private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
    -    //Primary lock for preventing multiple synchronous batches with the same data
    -    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    -    //Create independent locks to synchronize updates that have nothing to do with actually
sending data
    -    private final Object countLock = new Object();
    -    private final Object requestLock = new Object();
    -
    -    private ObjectMapper mapper = new StreamsJacksonMapper();
    -    private ElasticsearchClientManager manager;
    -    private ElasticsearchWriterConfiguration config;
    -    private Client client;
    -    private String parentID = null;
    +
    +    private final ElasticsearchClientManager manager;
    +    private final ElasticsearchWriterConfiguration config;
    +
         private BulkRequestBuilder bulkRequest;
    -    private OutputStreamWriter currentWriter = null;
    -    private int batchSize;
    -    private long maxTimeBetweenFlushMs;
    +
         private boolean veryLargeBulk = false;  // by default this setting is set to false
    +    private long flushThresholdsRecords = DEFAULT_BATCH_SIZE;
    +    private long flushThresholdBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
     
    -    protected Thread task;
    -
    -    protected volatile Queue<StreamsDatum> persistQueue;
    -    protected final List<ListenableActionFuture<BulkResponse>> responses
= Lists.newLinkedList();
    -
    -    private volatile int currentItems = 0;
    -    private volatile int totalSent = 0;
    -    private volatile int totalSeconds = 0;
    -    private volatile int totalAttempted = 0;
    -    private volatile int totalOk = 0;
    -    private volatile int totalFailed = 0;
    -    private volatile int totalBatchCount = 0;
    -    private volatile int totalRecordsWritten = 0;
    -    private volatile long totalSizeInBytes = 0;
    -    private volatile long batchSizeInBytes = 0;
    -    private volatile int batchItemsSent = 0;
    -    private volatile int totalByteCount = 0;
    -    private volatile int byteCount = 0;
    -    private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
    +    private long flushThresholdTime = DEFAULT_MAX_WAIT;
    +    private long lastFlush = new Date().getTime();
    +    private Timer timer = new Timer();
     
    -    public ElasticsearchPersistWriter() {
    -        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
    -        this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
    -    }
     
    -    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    -        this.config = config;
    -    }
    +    private final AtomicInteger batchesSent = new AtomicInteger(0);
    +    private final AtomicInteger batchesResponded = new AtomicInteger(0);
     
    -    public void setBatchSize(int batchSize) {
    -        this.batchSize = batchSize;
    -    }
    +    private final AtomicLong currentBatchItems = new AtomicLong(0);
    +    private final AtomicLong currentBatchBytes = new AtomicLong(0);
     
    -    public void setVeryLargeBulk(boolean veryLargeBulk) {
    -        this.veryLargeBulk = veryLargeBulk;
    -    }
    +    private final AtomicLong totalSent = new AtomicLong(0);
    +    private final AtomicLong totalSeconds = new AtomicLong(0);
    +    private final AtomicLong totalOk = new AtomicLong(0);
    +    private final AtomicLong totalFailed = new AtomicLong(0);
    +    private final AtomicLong totalSizeInBytes = new AtomicLong(0);
     
    -    public int getTotalOutstanding() {
    -        return this.totalSent - (this.totalFailed + this.totalOk);
    -    }
    -
    -    public long getFlushThresholdSizeInBytes() {
    -        return flushThresholdSizeInBytes;
    +    public ElasticsearchPersistWriter() {
    +        this(ElasticsearchConfigurator.detectWriterConfiguration(StreamsConfigurator.config.getConfig("elasticsearch")));
         }
     
    -    public int getTotalSent() {
    -        return totalSent;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    +        this(config, new ElasticsearchClientManager(config));
         }
     
    -    public int getTotalSeconds() {
    -        return totalSeconds;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config, ElasticsearchClientManager
manager) {
    +        this.config = config;
    +        this.manager = manager;
    +        this.bulkRequest = this.manager.getClient().prepareBulk();
         }
     
    -    public int getTotalOk() {
    -        return totalOk;
    -    }
    +    public long getBatchesSent()                            { return this.batchesSent.get();
}
    +    public long getBatchesResponded()                       { return batchesResponded.get();
}
     
    -    public int getTotalFailed() {
    -        return totalFailed;
    -    }
     
    -    public int getTotalBatchCount() {
    -        return totalBatchCount;
    -    }
    +    public long getFlushThresholdsRecords()                 { return this.flushThresholdsRecords;
}
    +    public long getFlushThresholdBytes()                    { return this.flushThresholdBytes;
}
    +    public long getFlushThreasholdMaxTime()                 { return this.flushThresholdTime;
}
     
    -    public long getTotalSizeInBytes() {
    -        return totalSizeInBytes;
    -    }
    +    public void setFlushThresholdRecords(long val)          { this.flushThresholdsRecords
= val; }
    +    public void setFlushThresholdBytes(long val)            { this.flushThresholdBytes
= val; }
    +    public void setFlushThreasholdMaxTime(long val)         { this.flushThresholdTime
= val; }
    +    public void setVeryLargeBulk(boolean veryLargeBulk)     { this.veryLargeBulk = veryLargeBulk;
}
     
    -    public long getBatchSizeInBytes() {
    -        return batchSizeInBytes;
    -    }
    +    private long getLastFlush()                             { return this.lastFlush;
}
     
    -    public int getBatchItemsSent() {
    -        return batchItemsSent;
    -    }
    +    public long getTotalOutstanding()                       { return this.totalSent.get()
- (this.totalFailed.get() + this.totalOk.get()); }
    +    public long getTotalSent()                              { return this.totalSent.get();
}
    +    public long getTotalOk()                                { return this.totalOk.get();
}
    +    public long getTotalFailed()                            { return this.totalFailed.get();
}
    +    public long getTotalSizeInBytes()                       { return this.totalSizeInBytes.get();
}
    +    public long getTotalSeconds()                           { return this.totalSeconds.get();
}
    +    public List<String> getAffectedIndexes()                { return this.affectedIndexes;
}
     
    -    public List<String> getAffectedIndexes() {
    -        return this.affectedIndexes;
    -    }
    +    public boolean isConnected()                            { return (this.manager.getClient()
!= null); }
     
    -    public void setFlushThresholdSizeInBytes(long sizeInBytes) {
    -        this.flushThresholdSizeInBytes = sizeInBytes;
    -    }
    +    @Override
    +    public void write(StreamsDatum streamsDatum) {
    +        if(streamsDatum == null || streamsDatum.getDocument() == null)
    +            return;
     
    -    public long getMaxTimeBetweenFlushMs() {
    -        return maxTimeBetweenFlushMs;
    -    }
    +        checkForBackOff();
     
    -    public void setMaxTimeBetweenFlushMs(long maxTimeBetweenFlushMs) {
    -        this.maxTimeBetweenFlushMs = maxTimeBetweenFlushMs;
    +        try {
    +            add(config.getIndex(), config.getType(), streamsDatum.getId(),
    +                    streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis())
: Long.toString(streamsDatum.getTimestamp().getMillis()),
    +                    convertAndAppendMetadata(streamsDatum));
    +        } catch (Throwable e) {
    +            LOGGER.warn("Unable to Write Datum to ElasticSearch: {}", e.getMessage());
    +        }
         }
     
    -    public boolean isConnected() {
    -        return (client != null);
    -    }
     
    -    @Override
    -    public void write(StreamsDatum streamsDatum) {
    +    private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException
{
    +        Object object = streamsDatum.getDocument();
     
    -        String json;
    -        String id = null;
    -        String ts = null;
    -        try {
    -            if( streamsDatum.getId() != null ) {
    -                id = streamsDatum.getId();
    -            }
    -            if( streamsDatum.getTimestamp() != null ) {
    -                ts = Long.toString(streamsDatum.getTimestamp().getMillis());
    +        String docAsJson = (object instanceof String) ? object.toString() : OBJECT_MAPPER.writeValueAsString(object);
    +        if(streamsDatum.getMetadata() == null || streamsDatum.getMetadata().size() ==
0)
    +            return docAsJson;
    +        else {
    +            ObjectNode node = (ObjectNode)OBJECT_MAPPER.readTree(docAsJson);
    +            try {
    +                node.put("_metadata", OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsBytes(streamsDatum.getMetadata())));
                 }
    -            if (streamsDatum.getDocument() instanceof String)
    -                json = streamsDatum.getDocument().toString();
    -            else {
    -                json = mapper.writeValueAsString(streamsDatum.getDocument());
    +            catch(Throwable e) {
    +                LOGGER.warn("Unable to write metadata");
                 }
    -
    -            add(config.getIndex(), config.getType(), id, ts, json);
    -
    -        } catch (Exception e) {
    -            LOGGER.warn("{} {}", e.getMessage());
    -            e.printStackTrace();
    +            return OBJECT_MAPPER.writeValueAsString(node);
             }
         }
     
         public void cleanUp() {
    -
             try {
    -            flush();
    -            backgroundFlushTask.shutdownNow();
    -        } catch (IOException e) {
    -            e.printStackTrace();
    -        }
    -        close();
    -    }
     
    -    @Override
    -    public void close() {
    -        try {
                 // before they close, check to ensure that
    -            this.flush();
    -
    -            this.lock.writeLock().lock();
    -
    -            int count = 0;
    -            // We are going to give it 5 minutes.
    -            while (this.getTotalOutstanding() > 0 && count++ < 20 * 60
* 5) {
    -                for(ListenableActionFuture<BulkResponse> future : responses) {
    -                    if(future.isDone() || future.isCancelled()) {
    -                        BulkResponse response = future.get();
    -                        LOGGER.warn("Found index request for {} items that was closed
without notification", response.getItems().length);
    -                        updateTotals(response, 0, 0);
    -                    }
    -                }
    -                Thread.sleep(50);
    -            }
    -
    -            if (this.getTotalOutstanding() > 0) {
    -                LOGGER.error("We never cleared our buffer");
    -            }
    +            flushInternal();
     
    +            waitToCatchUp(0, 5 * 60 * 1000);
    +            refreshIndexes();
     
    -            for (String indexName : this.getAffectedIndexes()) {
    -                createIndexIfMissing(indexName);
    +            LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]",
this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding());
     
    -                if (this.veryLargeBulk) {
    -                    LOGGER.debug("Resetting our Refresh Interval: {}", indexName);
    -                    // They are in 'very large bulk' mode and the process is finished.
We now want to turn the
    -                    // refreshing back on.
    -                    UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
    -                    updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval",
"5s"));
    +        } catch (Throwable e) {
    +            // this line of code should be logically unreachable.
    +            LOGGER.warn("This is unexpected: {}", e.getMessage());
    +            e.printStackTrace();
    +        }
    +    }
     
    -                    // submit to ElasticSearch
    -                    this.manager.getClient()
    -                            .admin()
    -                            .indices()
    -                            .updateSettings(updateSettingsRequest)
    -                            .actionGet();
    -                }
    +    private void refreshIndexes() {
    +        for (String indexName : this.affectedIndexes) {
     
    -                checkIndexImplications(indexName);
    +            if (this.veryLargeBulk) {
    +                LOGGER.debug("Resetting our Refresh Interval: {}", indexName);
    +                // They are in 'very large bulk' mode and the process is finished. We
now want to turn the
    +                // refreshing back on.
    +                UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
    +                updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval",
"5s"));
     
    -                LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
    +                // submit to ElasticSearch
                     this.manager.getClient()
                             .admin()
                             .indices()
    -                        .prepareRefresh(indexName)
    -                        .execute()
    +                        .updateSettings(updateSettingsRequest)
                             .actionGet();
                 }
     
    -            LOGGER.info("Closed: Wrote[{} of {}] Failed[{}]", this.getTotalOk(), this.getTotalSent(),
this.getTotalFailed());
    +            checkIndexImplications(indexName);
     
    -        } catch (Exception e) {
    -            // this line of code should be logically unreachable.
    -            LOGGER.warn("This is unexpected: {}", e.getMessage());
    -            e.printStackTrace();
    -        } finally {
    -            this.lock.writeLock().unlock();
    +            LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
    +            this.manager.getClient()
    +                    .admin()
    +                    .indices()
    +                    .prepareRefresh(indexName)
    +                    .execute()
    +                    .actionGet();
             }
         }
     
         @Override
    -    public void flush() throws IOException {
    -        flushInternal();
    -    }
    -
    -    @Override
         public DatumStatusCounter getDatumStatusCounter() {
             DatumStatusCounter counters = new DatumStatusCounter();
    -        counters.incrementAttempt(this.batchItemsSent);
    -        counters.incrementStatus(DatumStatus.SUCCESS, this.totalOk);
    -        counters.incrementStatus(DatumStatus.FAIL, this.totalFailed);
    +        counters.incrementStatus(DatumStatus.SUCCESS, (int)this.totalOk.get());
    +        counters.incrementStatus(DatumStatus.FAIL, (int)this.totalFailed.get());
             return counters;
         }
     
    -    public void start() {
    -        backgroundFlushTask.scheduleWithFixedDelay(new Runnable() {
    -            @Override
    -            public void run() {
    -                LOGGER.debug("Checking to see if data needs to be flushed");
    -                long time = System.currentTimeMillis() - lastWrite.get();
    -                if (time > maxTimeBetweenFlushMs && batchItemsSent > 0)
{
    -                    LOGGER.debug("Background Flush task determined {} are waiting to
be flushed.  It has been {} since the last write to ES", batchItemsSent, time);
    -                    flushInternal();
    -                }
    -            }
    -        }, 0, maxTimeBetweenFlushMs * 2, TimeUnit.MILLISECONDS);
    -        manager = new ElasticsearchClientManager(config);
    -        client = manager.getClient();
    -
    -        LOGGER.info(client.toString());
    -    }
    -
    -    public void flushInternal() {
    -        lock.writeLock().lock();
    +    private synchronized void flushInternal() {
             // we do not have a working bulk request, we can just exit here.
    -        if (this.bulkRequest == null || batchItemsSent == 0)
    +        if (this.bulkRequest == null || this.currentBatchItems.get() == 0)
                 return;
     
    +        // wait for one minute to catch up if it needs to
    +        waitToCatchUp(5, 1 * 60 * 1000);
    +
             // call the flush command.
    -        flush(this.bulkRequest, batchItemsSent, batchSizeInBytes);
    +        flush(this.bulkRequest, this.currentBatchItems.get(), this.currentBatchBytes.get());
     
    -        // null the flush request, this will be created in the 'add' function below
    -        this.bulkRequest = null;
    +        // reset the current batch statistics
    +        this.currentBatchItems.set(0);
    +        this.currentBatchBytes.set(0);
     
    -        // record the proper statistics, and add it to our totals.
    -        this.totalSizeInBytes += this.batchSizeInBytes;
    -        this.totalSent += batchItemsSent;
    +        // reset our bulk request builder
    +        this.bulkRequest = this.manager.getClient().prepareBulk();
    +    }
     
    -        // reset the current batch statistics
    -        this.batchSizeInBytes = 0;
    -        this.batchItemsSent = 0;
    +    private synchronized void waitToCatchUp(int batchThreshold, int timeOutThresholdInMS)
{
    +        int counter = 0;
    +        // If we still have 5 batches outstanding, we need to give it a minute to catch
up
    +        while(this.getBatchesSent() - this.getBatchesResponded() > batchThreshold
&& counter < timeOutThresholdInMS) {
    +            try {
    +                Thread.yield();
    +                Thread.sleep(1);
    +                timeOutThresholdInMS++;
    --- End diff --
    
    If the throughput warrants 1ms, that is fine.
    
    However, Is there are reason why you are incrementing the timeout threshold instead of
the counter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message