streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From reba...@apache.org
Subject [2/4] incubator-streams git commit: Refined shutdown processes and logging statements
Date Fri, 21 Nov 2014 00:22:26 GMT
Refined shutdown processes and logging statements


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/26d7ece8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/26d7ece8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/26d7ece8

Branch: refs/heads/master
Commit: 26d7ece894f485d549f4bcea3bfb37d8410d5a63
Parents: 7dd0b77
Author: Ryan Ebanks <ryanebanks@gmail.com>
Authored: Thu Nov 20 17:32:54 2014 -0600
Committer: Ryan Ebanks <ryanebanks@gmail.com>
Committed: Thu Nov 20 17:32:54 2014 -0600

----------------------------------------------------------------------
 .../gplus/provider/GPlusUserDataProvider.java       |  2 +-
 .../streams/local/builders/LocalStreamBuilder.java  | 16 ++++++++++++++++
 .../apache/streams/local/tasks/BaseStreamsTask.java |  2 ++
 .../local/tasks/StreamsPersistWriterTask.java       |  5 +++--
 .../streams/local/tasks/StreamsProcessorTask.java   |  7 +++++--
 .../streams/local/tasks/StreamsProviderTask.java    |  2 ++
 6 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/26d7ece8/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
index 74ed2e7..e2c8c5c 100644
--- a/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
+++ b/streams-contrib/streams-provider-google/google-gplus/src/main/java/com/google/gplus/provider/GPlusUserDataProvider.java
@@ -13,7 +13,7 @@ import java.util.concurrent.BlockingQueue;
  */
 public class GPlusUserDataProvider extends AbstractGPlusProvider{
 
-    public GPlusUserDataProvider(){
+    public GPlusUserDataProvider() {
         super();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/26d7ece8/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index a9afc3c..778407a 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -219,6 +219,8 @@ public class LocalStreamBuilder implements StreamBuilder {
                 }
                 if(isRunning) {
                     Thread.sleep(3000);
+                } else {
+                    LOGGER.info("Stream has completed successfully, shutting down @ {}",
System.currentTimeMillis());
                 }
             }
             LOGGER.debug("Components are no longer running or timed out");
@@ -377,6 +379,20 @@ public class LocalStreamBuilder implements StreamBuilder {
         stopInternal(false);
     }
 
+    /**
+     * Attempts to shut down the stream and let all data in flight finish processing.  When
shutdown(long) is called, it
+     * immediately stops all {@link org.apache.streams.core.StreamsProvider}s and will attempt
to let all {@link org.apache.streams.core.StreamsProcessor}s
+     * and all {@link org.apache.streams.core.StreamsPersistWriter}s finish processing.
+     * @param waitInMs
+     */
+    public void shutdown(Long waitInMs) {
+
+    }
+
+    public void shutdownNow() {
+
+    }
+
     protected void stopInternal(boolean systemExiting) {
         try {
             shutdown(tasks);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/26d7ece8/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
index 9726963..907bce3 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/BaseStreamsTask.java
@@ -229,4 +229,6 @@ public abstract class BaseStreamsTask implements StreamsTask {
             this.streamIdentifier = LocalStreamBuilder.DEFAULT_STREAM_IDENTIFIER;
         }
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/26d7ece8/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
index 235ee92..050e297 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
@@ -111,11 +111,12 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements
DatumSt
                 try {
                     this.blocked.set(true);
                     datum = this.inQueue.poll(5, TimeUnit.SECONDS);
-                    this.blocked.set(false);
                 } catch (InterruptedException ie) {
-                    LOGGER.error("Received InterruptedException. Shutting down and re-applying
interrupt status.");
+                    LOGGER.debug("Received InterruptedException. Shutting down and re-applying
interrupt status.");
                     this.keepRunning.set(false);
                     Thread.currentThread().interrupt();
+                } finally {
+                    this.blocked.set(false);
                 }
                 if(datum != null) {
                     this.counter.incrementReceivedCount();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/26d7ece8/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index c470d0b..2ec6336 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@ -120,11 +120,12 @@ public class StreamsProcessorTask extends BaseStreamsTask implements
DatumStatus
                 try {
                     this.blocked.set(true);
                     datum = this.inQueue.poll(5, TimeUnit.SECONDS);
-                    this.blocked.set(false);
                 } catch (InterruptedException ie) {
-                    LOGGER.warn("Received InteruptedException, shutting down and re-applying
interrupt status.");
+                    LOGGER.debug("Received InteruptedException, shutting down and re-applying
interrupt status.");
                     this.keepRunning.set(false);
                     Thread.currentThread().interrupt();
+                } finally {
+                    this.blocked.set(false);
                 }
                 if(datum != null) {
                     this.counter.incrementReceivedCount();
@@ -171,4 +172,6 @@ public class StreamsProcessorTask extends BaseStreamsTask implements DatumStatus
     public void setStreamsTaskCounter(StreamsTaskCounter counter) {
         this.counter = counter;
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/26d7ece8/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
index 8c87d7a..044ea67 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProviderTask.java
@@ -247,4 +247,6 @@ public class StreamsProviderTask extends BaseStreamsTask implements DatumStatusC
     public void setStreamsTaskCounter(StreamsTaskCounter counter) {
         this.counter = counter;
     }
+
+
 }


Mime
View raw message