streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject [06/53] [abbrv] git commit: adding platform-level status counters debugging data leak
Date Thu, 17 Apr 2014 20:27:39 GMT
adding platform-level status counters
debugging data leak


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ec28cc5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ec28cc5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ec28cc5e

Branch: refs/heads/master
Commit: ec28cc5e0b4f70f6b1cc1e3aa8911136f4354813
Parents: ab5165a
Author: Steve Blackmon <sblackmon@w2odigital.com>
Authored: Mon Mar 24 15:52:43 2014 -0500
Committer: Steve Blackmon <sblackmon@w2odigital.com>
Committed: Mon Mar 24 15:52:43 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchPersistWriter.java             | 13 +++++++---
 .../apache/streams/core/DatumStatusCounter.java | 14 +++++++++++
 .../local/tasks/StreamsPersistWriterTask.java   | 26 ++++++++++++++++----
 .../local/tasks/StreamsProviderTask.java        | 14 ++++++-----
 4 files changed, 53 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ec28cc5e/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 250e15a..9390219 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -6,8 +6,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.*;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@@ -37,7 +36,7 @@ import java.text.DecimalFormat;
 import java.text.NumberFormat;
 import java.util.*;
 
-public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable
+public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushable, Closeable,
DatumStatusCountable
 {
     public final static String STREAMS_ID = "ElasticsearchPersistWriter";
 
@@ -530,4 +529,12 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter,
Flushab
         start();
     }
 
+    @Override
+    public DatumStatusCounter getDatumStatusCounter() {
+        DatumStatusCounter counters = new DatumStatusCounter();
+        counters.incrementAttempt(this.batchItemsSent);
+        counters.incrementStatus(DatumStatus.SUCCESS, this.totalOk);
+        counters.incrementStatus(DatumStatus.FAIL, this.totalFailed);
+        return counters;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ec28cc5e/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
index 8730d73..96f73c9 100644
--- a/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
+++ b/streams-core/src/main/java/org/apache/streams/core/DatumStatusCounter.java
@@ -29,6 +29,10 @@ public class DatumStatusCounter
         this.attempted += 1;
     }
 
+    public void incrementAttempt(int counter) {
+        this.attempted += counter;
+    }
+
     public synchronized void incrementStatus(DatumStatus workStatus) {
         // add this to the record counter
         switch(workStatus) {
@@ -39,6 +43,16 @@ public class DatumStatusCounter
         this.emitted += 1;
     }
 
+    public synchronized void incrementStatus(DatumStatus workStatus, int counter) {
+        // add this to the record counter
+        switch(workStatus) {
+            case SUCCESS: this.success += counter; break;
+            case PARTIAL: this.partial += counter; break;
+            case FAIL: this.fail += counter; break;
+        }
+        this.emitted += counter;
+    }
+
     @Override
     public String toString() {
         return "DatumStatusCounter{" +

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ec28cc5e/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
index 882bcb7..1eac1d9 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
@@ -1,7 +1,8 @@
 package org.apache.streams.local.tasks;
 
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.core.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.LinkedList;
 import java.util.List;
@@ -12,9 +13,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 /**
  *
  */
-public class StreamsPersistWriterTask extends BaseStreamsTask {
-
+public class StreamsPersistWriterTask extends BaseStreamsTask implements DatumStatusCountable
{
 
+    private final static Logger LOGGER = LoggerFactory.getLogger(StreamsPersistWriterTask.class);
 
     private StreamsPersistWriter writer;
     private long sleepTime;
@@ -23,6 +24,14 @@ public class StreamsPersistWriterTask extends BaseStreamsTask {
     private Queue<StreamsDatum> inQueue;
     private AtomicBoolean isRunning;
 
+    private DatumStatusCounter statusCounter = new DatumStatusCounter();
+
+    @Override
+    public DatumStatusCounter getDatumStatusCounter() {
+        return this.statusCounter;
+    }
+
+
     /**
      * Default constructor.  Uses default sleep of 500ms when inbound queue is empty.
      * @param writer writer to execute in task
@@ -65,7 +74,13 @@ public class StreamsPersistWriterTask extends BaseStreamsTask {
             StreamsDatum datum = this.inQueue.poll();
             while(datum != null || this.keepRunning.get()) {
                 if(datum != null) {
-                    this.writer.write(datum);
+                    try {
+                        this.writer.write(datum);
+                        statusCounter.incrementStatus(DatumStatus.SUCCESS);
+                    } catch (Exception e) {
+                        this.keepRunning.set(false);
+                        statusCounter.incrementStatus(DatumStatus.FAIL);
+                    }
                 }
                 else {
                     try {
@@ -100,4 +115,5 @@ public class StreamsPersistWriterTask extends BaseStreamsTask {
         queues.add(this.inQueue);
         return queues;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ec28cc5e/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index 7b6792f..5cf515c 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -119,10 +119,6 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
                                 zeros++;
                             else {
                                 zeros = 0;
-                                if( resultSet.getCounter() != null ) {
-                                    LOGGER.debug(resultSet.getCounter().toString());
-                                    this.statusCounter.add(resultSet.getCounter());
-                                }
                             }
                             flushResults(resultSet);
                             if( zeros > (DEFAULT_TIMEOUT_MS / DEFAULT_SLEEP_TIME_MS))
@@ -162,8 +158,14 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
             if(!this.keepRunning.get()) {
                 break;
             }
-            if(datum != null)
-                super.addToOutgoingQueue(datum);
+            if(datum != null) {
+                try {
+                    super.addToOutgoingQueue(datum);
+                    statusCounter.incrementStatus(DatumStatus.SUCCESS);
+                } catch( Exception e ) {
+                    statusCounter.incrementStatus(DatumStatus.FAIL);
+                }
+            }
             else {
                 try {
                     Thread.sleep(DEFAULT_SLEEP_TIME_MS);


Mime
View raw message