streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject [5/6] git commit: Merge remote-tracking branch 'origin/pr/7' into streamstutorial
Date Thu, 08 May 2014 19:33:20 GMT
Merge remote-tracking branch 'origin/pr/7' into streamstutorial

Conflicts:
	streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
	streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/145ec847
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/145ec847
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/145ec847

Branch: refs/heads/streamstutorial
Commit: 145ec8472c9cf12e8e376ce09596f354644e587b
Parents: 7293594 f62afa5
Author: sblackmon <sblackmon@w2odigital.com>
Authored: Thu May 8 14:15:27 2014 -0500
Committer: sblackmon <sblackmon@w2odigital.com>
Committed: Thu May 8 14:15:27 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchConfigurator.java              |   4 +-
 .../ElasticsearchPersistWriter.java             | 273 +++++++++----------
 .../ElasticsearchWriterConfiguration.json       |   5 +-
 .../provider/SysomosHeartbeatStream.java        |  45 +--
 .../local/builders/LocalStreamBuilder.java      | 161 +++++++----
 .../local/tasks/StreamsPersistWriterTask.java   |   6 +-
 .../local/tasks/StreamsProcessorTask.java       |   2 +-
 7 files changed, 271 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/145ec847/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --cc streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
index 9ac234c,8a343a6..e07e50f
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
@@@ -51,17 -51,12 +51,19 @@@ public class ElasticsearchConfigurator 
  
          String index = elasticsearch.getString("index");
          String type = elasticsearch.getString("type");
+         Long maxMsBeforeFlush = elasticsearch.hasPath("MaxTimeBetweenFlushMs") ? elasticsearch.getLong("MaxTimeBetweenFlushMs")
: null;
  
 +        if( elasticsearch.hasPath("bulk"))
 +            elasticsearchWriterConfiguration.setBulk(elasticsearch.getBoolean("bulk"));
 +
 +        if( elasticsearch.hasPath("batchSize"))
 +            elasticsearchWriterConfiguration.setBatchSize(elasticsearch.getLong("batchSize"));
 +
          elasticsearchWriterConfiguration.setIndex(index);
          elasticsearchWriterConfiguration.setType(type);
+         elasticsearchWriterConfiguration.setMaxTimeBetweenFlushMs(maxMsBeforeFlush);
  
 +
          return elasticsearchWriterConfiguration;
      }
  

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/145ec847/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --cc streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 80d2775,5756e1c..ce23197
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@@ -49,8 -54,11 +54,12 @@@ public class ElasticsearchPersistWrite
      private static final long WAITING_DOCS_LIMIT = 10000;
      private static final int BYTES_IN_MB = 1024 * 1024;
      private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
+     private static final long DEFAULT_MAX_WAIT = 10000;
++    private static final int DEFAULT_BATCH_SIZE = 100;
  
      private final List<String> affectedIndexes = new ArrayList<String>();
+     private final ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
+     private final ReadWriteLock lock = new ReentrantReadWriteLock();
  
      private ObjectMapper mapper = new StreamsJacksonMapper();
      private ElasticsearchClientManager manager;
@@@ -59,11 -67,11 +68,10 @@@
      private String parentID = null;
      private BulkRequestBuilder bulkRequest;
      private OutputStreamWriter currentWriter = null;
 -    private int batchSize = 50;
 -    private int totalRecordsWritten = 0;
 -    private long maxMsBeforeFlush;
++    private int batchSize;
++    private long maxTimeBetweenFlushMs;
+     private boolean veryLargeBulk = false;  // by default this setting is set to false
  
-     private long batchSize;
-     private boolean veryLargeBulk;  // by default this setting is set to false
-     private int totalRecordsWritten = 0;
-     
      protected Thread task;
  
      protected volatile Queue<StreamsDatum> persistQueue;
@@@ -99,8 -106,7 +108,6 @@@
          this.veryLargeBulk = veryLargeBulk;
      }
  
-     private final List<String> affectedIndexes = new ArrayList<String>();
--
      public int getTotalOutstanding() {
          return this.totalSent - (this.totalFailed + this.totalOk);
      }
@@@ -151,6 -155,6 +156,14 @@@
          this.flushThresholdSizeInBytes = sizeInBytes;
      }
  
++    public long getMaxTimeBetweenFlushMs() {
++        return maxTimeBetweenFlushMs;
++    }
++
++    public void setMaxTimeBetweenFlushMs(long maxTimeBetweenFlushMs) {
++        this.maxTimeBetweenFlushMs = maxTimeBetweenFlushMs;
++    }
++
      public boolean isConnected() {
          return (client != null);
      }
@@@ -252,12 -250,13 +259,6 @@@
      }
  
      @Override
--    public void prepare(Object configurationObject) {
-         mapper = StreamsJacksonMapper.getInstance();
-         start();
-     }
- 
-     @Override
 -        maxMsBeforeFlush = config.getMaxTimeBetweenFlushMs() == null ? DEFAULT_MAX_WAIT
: config.getMaxTimeBetweenFlushMs();
 -        mapper = StreamsJacksonMapper.getInstance();
 -        start();
 -    }
 -
 -    @Override
      public DatumStatusCounter getDatumStatusCounter() {
          DatumStatusCounter counters = new DatumStatusCounter();
          counters.incrementAttempt(this.batchItemsSent);
@@@ -267,7 -266,17 +268,17 @@@
      }
  
      public void start() {
- 
+         backgroundFlushTask.scheduleWithFixedDelay(new Runnable() {
+             @Override
+             public void run() {
+                 LOGGER.debug("Checking to see if data needs to be flushed");
+                 long time = System.currentTimeMillis() - lastWrite.get();
 -                if (time > maxMsBeforeFlush && batchItemsSent > 0) {
++                if (time > maxTimeBetweenFlushMs && batchItemsSent > 0) {
+                     LOGGER.debug("Background Flush task determined {} are waiting to be
flushed.  It has been {} since the last write to ES", batchItemsSent, time);
+                     flushInternal();
+                 }
+             }
 -        }, 0, maxMsBeforeFlush * 2, TimeUnit.MILLISECONDS);
++        }, 0, maxTimeBetweenFlushMs * 2, TimeUnit.MILLISECONDS);
          manager = new ElasticsearchClientManager(config);
          client = manager.getClient();
  
@@@ -383,70 -394,20 +396,62 @@@
      }
  
      public void add(IndexRequest indexRequest) {
-         synchronized (this) {
-             checkAndCreateBulkRequest();
-             checkIndexImplications(indexRequest.index());
-             bulkRequest.add(indexRequest);
-             try {
-                 trackItemAndBytesWritten(indexRequest.source().length());
-             } catch (NullPointerException x) {
-                 LOGGER.warn("NPE adding/sizing indexrequest");
-             }
+         lock.writeLock().lock();
+         checkAndCreateBulkRequest();
+         checkIndexImplications(indexRequest.index());
+         bulkRequest.add(indexRequest);
+         try {
+             trackItemAndBytesWritten(indexRequest.source().length());
+         } catch (NullPointerException x) {
+             LOGGER.warn("NPE adding/sizing indexrequest");
+         } finally {
+             lock.writeLock().unlock();
          }
+ 
      }
  
 +    private void trackItemAndBytesWritten(long sizeInBytes)
 +    {
 +        currentItems++;
 +        batchItemsSent++;
 +        batchSizeInBytes += sizeInBytes;
 +
 +        // If our queue is larger than our flush threashold, then we should flush the queue.
 +        if( (batchSizeInBytes > flushThresholdSizeInBytes) ||
 +                (currentItems >= batchSize) )
 +            flushInternal();
 +    }
 +
-     private void checkAndCreateBulkRequest()
-     {
-         // Synchronize to ensure that we don't lose any records
-         synchronized (this)
-         {
-             if(bulkRequest == null)
-                 bulkRequest = this.manager.getClient().prepareBulk();
-         }
-     }
- 
 +    private void checkIndexImplications(String indexName)
 +    {
 +
 +        // check to see if we have seen this index before.
 +        if(this.affectedIndexes.contains(indexName))
 +            return;
 +
 +        // we haven't log this index.
 +        this.affectedIndexes.add(indexName);
 +
 +        // Check to see if we are in 'veryLargeBulk' mode
 +        // if we aren't, exit early
 +        if(!this.veryLargeBulk)
 +            return;
 +
 +
 +        // They are in 'very large bulk' mode we want to turn off refreshing the index.
 +
 +        // Create a request then add the setting to tell it to stop refreshing the interval
 +        UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
 +        updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval",
-1));
 +
 +        // submit to ElasticSearch
 +        this.manager.getClient()
 +                .admin()
 +                .indices()
 +                .updateSettings(updateSettingsRequest)
 +                .actionGet();
 +    }
 +
      public void createIndexIfMissing(String indexName) {
          if (!this.manager.getClient()
                  .admin()
@@@ -512,15 -473,15 +517,24 @@@
          return toReturn;
      }
  
 +    @Override
 +    public void prepare(Object configurationObject) {
 +        mapper = StreamsJacksonMapper.getInstance();
-         veryLargeBulk = this.config.getBulk();
-         batchSize = this.config.getBatchSize();
++        veryLargeBulk = config.getBulk() == null ? Boolean.FALSE : config.getBulk();
++        batchSize = config.getBatchSize() == null ? DEFAULT_BATCH_SIZE : (int)(config.getBatchSize().longValue());
++        maxTimeBetweenFlushMs = config.getMaxTimeBetweenFlushMs() == null ? DEFAULT_MAX_WAIT
: config.getMaxTimeBetweenFlushMs().longValue();
 +        start();
 +    }
 +    
+     /**
+      * This method is to ONLY be called by flushInternal otherwise the counts will be off.
+      * @param bulkRequest
+      * @param thisSent
+      * @param thisSizeInBytes
+      */
      private void flush(final BulkRequestBuilder bulkRequest, final Integer thisSent, final
Long thisSizeInBytes) {
+         final Object messenger = new Object();
+         LOGGER.debug("Attempting to write {} items to ES", thisSent);
          bulkRequest.execute().addListener(new ActionListener<BulkResponse>() {
              @Override
              public void onResponse(BulkResponse bulkItemResponses) {
@@@ -558,18 -521,16 +574,9 @@@
                  e.printStackTrace();
              }
          });
- 
-         this.notify();
      }
  
--    private void trackItemAndBytesWritten(long sizeInBytes) {
--        batchItemsSent++;
--        batchSizeInBytes += sizeInBytes;
  
--        // If our queue is larger than our flush threashold, then we should flush the queue.
--        if (batchSizeInBytes > flushThresholdSizeInBytes)
--            flushInternal();
--    }
  
      private void checkAndCreateBulkRequest() {
          // Synchronize to ensure that we don't lose any records
@@@ -579,32 -543,32 +589,5 @@@
          }
      }
  
--    private void checkIndexImplications(String indexName) {
--
--        // check to see if we have seen this index before.
--        if (this.affectedIndexes.contains(indexName))
--            return;
--
--        // we haven't log this index.
--        this.affectedIndexes.add(indexName);
--
--        // Check to see if we are in 'veryLargeBulk' mode
--        // if we aren't, exit early
--        if (!this.veryLargeBulk)
--            return;
--
--
--        // They are in 'very large bulk' mode we want to turn off refreshing the index.
--
--        // Create a request then add the setting to tell it to stop refreshing the interval
--        UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName);
--        updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval",
-1));
--
--        // submit to ElasticSearch
--        this.manager.getClient()
--                .admin()
--                .indices()
--                .updateSettings(updateSettingsRequest)
--                .actionGet();
--    }
  }
++

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/145ec847/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
----------------------------------------------------------------------
diff --cc streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
index 5e169b7,c56aa82..b107be6
--- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchWriterConfiguration.json
@@@ -14,15 -14,9 +14,18 @@@
              "type": "string",
              "description": "Type to write as"
          },
 -		"MaxTimeBetweenFlushMs": {
 -			"type": "String",
 -			"format": "utc-millisec"
 +        "bulk": {
 +            "type": "boolean",
 +            "description": "Index in large or small batches",
 +            "default": "false"
 +        },
 +        "batchSize": {
 +            "type": "integer",
 +            "description": "Item Count before flush",
 +            "default": 100
-         }
++        },
++		"maxTimeBetweenFlushMs": {
++			"type": "integer"
+ 		}
      }
  }


Mime
View raw message