streams-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mfranklin <...@git.apache.org>
Subject [GitHub] incubator-streams pull request: Fixed the ElasticSearchPersistWrit...
Date Fri, 27 Jun 2014 15:58:55 GMT
Github user mfranklin commented on a diff in the pull request:

    https://github.com/apache/incubator-streams/pull/45#discussion_r14300164
  
    --- Diff: streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
---
    @@ -39,321 +37,227 @@
     import org.elasticsearch.action.bulk.BulkResponse;
     import org.elasticsearch.action.index.IndexRequest;
     import org.elasticsearch.action.index.IndexRequestBuilder;
    -import org.elasticsearch.action.search.SearchRequestBuilder;
    -import org.elasticsearch.action.update.UpdateRequest;
    -import org.elasticsearch.client.Client;
    +import org.elasticsearch.common.joda.time.DateTime;
     import org.elasticsearch.common.settings.ImmutableSettings;
    -import org.elasticsearch.index.query.IdsQueryBuilder;
    -import org.elasticsearch.search.SearchHit;
    -import org.elasticsearch.search.SearchHits;
    -import org.json.JSONException;
    -import org.json.JSONObject;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    +import com.fasterxml.jackson.core.JsonParser;
     
    -import java.io.Closeable;
    -import java.io.Flushable;
     import java.io.IOException;
    -import java.io.OutputStreamWriter;
    +import java.io.Serializable;
     import java.text.DecimalFormat;
     import java.text.NumberFormat;
     import java.util.*;
    -import java.util.concurrent.Executors;
    -import java.util.concurrent.ScheduledExecutorService;
    -import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicInteger;
     import java.util.concurrent.atomic.AtomicLong;
    -import java.util.concurrent.locks.ReadWriteLock;
    -import java.util.concurrent.locks.ReentrantReadWriteLock;
     
    -public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable,
DatumStatusCountable {
    -    public static final String STREAMS_ID = "ElasticsearchPersistWriter";
    +public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumStatusCountable,
Serializable {
     
    -    public volatile long flushThresholdSizeInBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
    +    public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName();
     
         private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
         private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
         private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
         private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 1024l;
         private static final long WAITING_DOCS_LIMIT = 10000;
    -    private static final int BYTES_IN_MB = 1024 * 1024;
    -    private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
         private static final long DEFAULT_MAX_WAIT = 10000;
         private static final int DEFAULT_BATCH_SIZE = 100;
     
    +    private static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();
    +
         private final List<String> affectedIndexes = new ArrayList<String>();
    -    private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
    -    //Primary lock for preventing multiple synchronous batches with the same data
    -    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    -    //Create independent locks to synchronize updates that have nothing to do with actually
sending data
    -    private final Object countLock = new Object();
    -    private final Object requestLock = new Object();
    -
    -    private ObjectMapper mapper = new StreamsJacksonMapper();
    -    private ElasticsearchClientManager manager;
    -    private ElasticsearchWriterConfiguration config;
    -    private Client client;
    -    private String parentID = null;
    +
    +    private final ElasticsearchClientManager manager;
    +    private final ElasticsearchWriterConfiguration config;
    +
         private BulkRequestBuilder bulkRequest;
    -    private OutputStreamWriter currentWriter = null;
    -    private int batchSize;
    -    private long maxTimeBetweenFlushMs;
    +
         private boolean veryLargeBulk = false;  // by default this setting is set to false
    +    private long flushThresholdsRecords = DEFAULT_BATCH_SIZE;
    +    private long flushThresholdBytes = DEFAULT_BULK_FLUSH_THRESHOLD;
     
    -    protected Thread task;
    -
    -    protected volatile Queue<StreamsDatum> persistQueue;
    -    protected final List<ListenableActionFuture<BulkResponse>> responses
= Lists.newLinkedList();
    -
    -    private volatile int currentItems = 0;
    -    private volatile int totalSent = 0;
    -    private volatile int totalSeconds = 0;
    -    private volatile int totalAttempted = 0;
    -    private volatile int totalOk = 0;
    -    private volatile int totalFailed = 0;
    -    private volatile int totalBatchCount = 0;
    -    private volatile int totalRecordsWritten = 0;
    -    private volatile long totalSizeInBytes = 0;
    -    private volatile long batchSizeInBytes = 0;
    -    private volatile int batchItemsSent = 0;
    -    private volatile int totalByteCount = 0;
    -    private volatile int byteCount = 0;
    -    private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
    +    private long flushThresholdTime = DEFAULT_MAX_WAIT;
    +    private long lastFlush = new Date().getTime();
    +    private Timer timer = new Timer();
     
    -    public ElasticsearchPersistWriter() {
    -        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
    -        this.config = ElasticsearchConfigurator.detectWriterConfiguration(config);
    -    }
     
    -    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    -        this.config = config;
    -    }
    +    private final AtomicInteger batchesSent = new AtomicInteger(0);
    +    private final AtomicInteger batchesResponded = new AtomicInteger(0);
     
    -    public void setBatchSize(int batchSize) {
    -        this.batchSize = batchSize;
    -    }
    +    private final AtomicLong currentBatchItems = new AtomicLong(0);
    +    private final AtomicLong currentBatchBytes = new AtomicLong(0);
     
    -    public void setVeryLargeBulk(boolean veryLargeBulk) {
    -        this.veryLargeBulk = veryLargeBulk;
    -    }
    +    private final AtomicLong totalSent = new AtomicLong(0);
    +    private final AtomicLong totalSeconds = new AtomicLong(0);
    +    private final AtomicLong totalOk = new AtomicLong(0);
    +    private final AtomicLong totalFailed = new AtomicLong(0);
    +    private final AtomicLong totalSizeInBytes = new AtomicLong(0);
     
    -    public int getTotalOutstanding() {
    -        return this.totalSent - (this.totalFailed + this.totalOk);
    -    }
    -
    -    public long getFlushThresholdSizeInBytes() {
    -        return flushThresholdSizeInBytes;
    +    public ElasticsearchPersistWriter() {
    +        this(ElasticsearchConfigurator.detectWriterConfiguration(StreamsConfigurator.config.getConfig("elasticsearch")));
         }
     
    -    public int getTotalSent() {
    -        return totalSent;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) {
    +        this(config, new ElasticsearchClientManager(config));
         }
     
    -    public int getTotalSeconds() {
    -        return totalSeconds;
    +    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config, ElasticsearchClientManager
manager) {
    +        this.config = config;
    +        this.manager = manager;
    +        this.bulkRequest = this.manager.getClient().prepareBulk();
         }
     
    -    public int getTotalOk() {
    -        return totalOk;
    -    }
    +    public long getBatchesSent()                            { return this.batchesSent.get();
}
    +    public long getBatchesResponded()                       { return batchesResponded.get();
}
     
    -    public int getTotalFailed() {
    -        return totalFailed;
    -    }
     
    -    public int getTotalBatchCount() {
    -        return totalBatchCount;
    -    }
    +    public long getFlushThresholdsRecords()                 { return this.flushThresholdsRecords;
}
    +    public long getFlushThresholdBytes()                    { return this.flushThresholdBytes;
}
    +    public long getFlushThreasholdMaxTime()                 { return this.flushThresholdTime;
}
     
    -    public long getTotalSizeInBytes() {
    -        return totalSizeInBytes;
    -    }
    +    public void setFlushThresholdRecords(long val)          { this.flushThresholdsRecords
= val; }
    +    public void setFlushThresholdBytes(long val)            { this.flushThresholdBytes
= val; }
    +    public void setFlushThreasholdMaxTime(long val)         { this.flushThresholdTime
= val; }
    +    public void setVeryLargeBulk(boolean veryLargeBulk)     { this.veryLargeBulk = veryLargeBulk;
}
     
    -    public long getBatchSizeInBytes() {
    -        return batchSizeInBytes;
    -    }
    +    private long getLastFlush()                             { return this.lastFlush;
}
     
    -    public int getBatchItemsSent() {
    -        return batchItemsSent;
    -    }
    +    public long getTotalOutstanding()                       { return this.totalSent.get()
- (this.totalFailed.get() + this.totalOk.get()); }
    +    public long getTotalSent()                              { return this.totalSent.get();
}
    +    public long getTotalOk()                                { return this.totalOk.get();
}
    +    public long getTotalFailed()                            { return this.totalFailed.get();
}
    +    public long getTotalSizeInBytes()                       { return this.totalSizeInBytes.get();
}
    +    public long getTotalSeconds()                           { return this.totalSeconds.get();
}
    +    public List<String> getAffectedIndexes()                { return this.affectedIndexes;
}
     
    -    public List<String> getAffectedIndexes() {
    -        return this.affectedIndexes;
    -    }
    +    public boolean isConnected()                            { return (this.manager.getClient()
!= null); }
     
    -    public void setFlushThresholdSizeInBytes(long sizeInBytes) {
    -        this.flushThresholdSizeInBytes = sizeInBytes;
    -    }
    +    @Override
    +    public void write(StreamsDatum streamsDatum) {
    +        if(streamsDatum == null || streamsDatum.getDocument() == null)
    +            return;
     
    -    public long getMaxTimeBetweenFlushMs() {
    -        return maxTimeBetweenFlushMs;
    -    }
    +        checkForBackOff();
     
    -    public void setMaxTimeBetweenFlushMs(long maxTimeBetweenFlushMs) {
    -        this.maxTimeBetweenFlushMs = maxTimeBetweenFlushMs;
    +        try {
    +            add(config.getIndex(), config.getType(), streamsDatum.getId(),
    +                    streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis())
: Long.toString(streamsDatum.getTimestamp().getMillis()),
    +                    convertAndAppendMetadata(streamsDatum));
    +        } catch (Throwable e) {
    +            LOGGER.warn("Unable to Write Datum to ElasticSearch: {}", e.getMessage());
    +        }
         }
     
    -    public boolean isConnected() {
    -        return (client != null);
    -    }
     
    -    @Override
    -    public void write(StreamsDatum streamsDatum) {
    +    private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException
{
    +        Object object = streamsDatum.getDocument();
     
    -        String json;
    -        String id = null;
    -        String ts = null;
    -        try {
    -            if( streamsDatum.getId() != null ) {
    -                id = streamsDatum.getId();
    -            }
    -            if( streamsDatum.getTimestamp() != null ) {
    --- End diff --
    
    I see.  I missed that in the line. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message