streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject [03/14] git commit: Updated termination process
Date Tue, 13 May 2014 15:22:53 GMT
Updated termination process


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6934c7a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6934c7a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6934c7a0

Branch: refs/heads/master
Commit: 6934c7a0d4c479392c6c147d00af8cd9ed7b3c44
Parents: c837a2c
Author: mfranklin <mfranklin@apache.org>
Authored: Thu May 1 18:45:30 2014 -0400
Committer: mfranklin <mfranklin@apache.org>
Committed: Thu May 1 18:45:30 2014 -0400

----------------------------------------------------------------------
 .../local/builders/LocalStreamBuilder.java      | 161 ++++++++++++-------
 .../local/tasks/StreamsPersistWriterTask.java   |   6 +-
 .../local/tasks/StreamsProcessorTask.java       |   2 +-
 3 files changed, 110 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6934c7a0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 8e688ba..d313b3f 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -1,5 +1,6 @@
 package org.apache.streams.local.builders;
 
+import org.apache.log4j.spi.LoggerFactory;
 import org.apache.streams.core.*;
 import org.apache.streams.local.tasks.LocalStreamProcessMonitorThread;
 import org.apache.streams.local.tasks.StatusCounterMonitorThread;
@@ -7,6 +8,7 @@ import org.apache.streams.local.tasks.StreamsProviderTask;
 import org.apache.streams.local.tasks.StreamsTask;
 import org.apache.streams.util.SerializationUtil;
 import org.joda.time.DateTime;
+import org.slf4j.Logger;
 
 import java.math.BigInteger;
 import java.util.*;
@@ -22,6 +24,8 @@ import java.util.concurrent.TimeUnit;
  */
 public class LocalStreamBuilder implements StreamBuilder {
 
+    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(LocalStreamBuilder.class);
+
     public static final String TIMEOUT_KEY = "TIMEOUT";
     private Map<String, StreamComponent> providers;
     private Map<String, StreamComponent> components;
@@ -32,6 +36,7 @@ public class LocalStreamBuilder implements StreamBuilder {
     private int totalTasks;
     private int monitorTasks;
     private LocalStreamProcessMonitorThread monitorThread;
+    private Map<String, List<StreamsTask>> tasks;
 
     /**
      *
@@ -139,39 +144,18 @@ public class LocalStreamBuilder implements StreamBuilder {
      */
     @Override
     public void start() {
+        attachShutdownHandler();
         boolean isRunning = true;
         this.executor = Executors.newFixedThreadPool(this.totalTasks);
         this.monitor = Executors.newFixedThreadPool(this.monitorTasks+1);
         Map<String, StreamsProviderTask> provTasks = new HashMap<String, StreamsProviderTask>();
-        Map<String, List<StreamsTask>> streamsTasks = new HashMap<String,
List<StreamsTask>>();
+        tasks = new HashMap<String, List<StreamsTask>>();
         try {
             monitorThread = new LocalStreamProcessMonitorThread(executor, 10);
             this.monitor.submit(monitorThread);
-            for(StreamComponent comp : this.components.values()) {
-                int tasks = comp.getNumTasks();
-                List<StreamsTask> compTasks = new LinkedList<StreamsTask>();
-                for(int i=0; i < tasks; ++i) {
-                    StreamsTask task = comp.createConnectedTask(getTimeout());
-                    task.setStreamConfig(this.streamConfig);
-                    this.executor.submit(task);
-                    compTasks.add(task);
-                    if( comp.isOperationCountable() ) {
-                        this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable)
comp.getOperation(), 10));
-                        this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable)
task, 10));
-                    }
-                }
-                streamsTasks.put(comp.getId(), compTasks);
-            }
-            for(StreamComponent prov : this.providers.values()) {
-                StreamsTask task = prov.createConnectedTask(getTimeout());
-                task.setStreamConfig(this.streamConfig);
-                this.executor.submit(task);
-                provTasks.put(prov.getId(), (StreamsProviderTask) task);
-                if( prov.isOperationCountable() ) {
-                    this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable)
prov.getOperation(), 10));
-                    this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable)
task, 10));
-                }
-            }
+            setupComponentTasks(tasks);
+            setupProviderTasks(provTasks);
+            LOGGER.info("Started stream with {} components", tasks.size());
             while(isRunning) {
                 isRunning = false;
                 for(StreamsProviderTask task : provTasks.values()) {
@@ -184,44 +168,98 @@ public class LocalStreamBuilder implements StreamBuilder {
                     Thread.sleep(3000);
                 }
             }
-            monitorThread.shutdown();
-            this.executor.shutdown();
-            //complete stream shut down gracfully
-            for(StreamComponent prov : this.providers.values()) {
-                shutDownTask(prov, streamsTasks);
+            LOGGER.debug("Components are no longer running or timed out due to completion");
+            shutdown(tasks);
+        } catch (InterruptedException e){
+            forceShutdown(tasks);
+        }
+
+    }
+
+    private void attachShutdownHandler() {
+        final LocalStreamBuilder self = this;
+        LOGGER.debug("Attaching shutdown handler");
+        Runtime.getRuntime().addShutdownHook(new Thread(){
+            @Override
+            public void run() {
+                LOGGER.debug("Shutdown hook received.  Beginning shutdown");
+                self.stop();
+            }
+        });
+    }
+
+    protected void forceShutdown(Map<String, List<StreamsTask>> streamsTasks)
{
+        LOGGER.debug("Shutdown failed.  Forcing shutdown");
+        //give the stream 30secs to try to shutdown gracefully, then force shutdown otherwise
+        for(List<StreamsTask> tasks : streamsTasks.values()) {
+            for(StreamsTask task : tasks) {
+                task.stopTask();
             }
-            //need to make this configurable
-            if(!this.executor.awaitTermination(10, TimeUnit.SECONDS)) { // all threads should
have terminated already.
+        }
+        this.executor.shutdown();
+        this.monitor.shutdown();
+        try {
+            if(!this.executor.awaitTermination(3, TimeUnit.SECONDS)){
                 this.executor.shutdownNow();
-                this.executor.awaitTermination(10, TimeUnit.SECONDS);
             }
-            if(!this.monitor.awaitTermination(5, TimeUnit.SECONDS)) { // all threads should
have terminated already.
+            if(!this.monitor.awaitTermination(3, TimeUnit.SECONDS)){
                 this.monitor.shutdownNow();
-                this.monitor.awaitTermination(5, TimeUnit.SECONDS);
             }
-        } catch (InterruptedException e){
-            //give the stream 30secs to try to shutdown gracefully, then force shutdown otherwise
-            for(List<StreamsTask> tasks : streamsTasks.values()) {
-                for(StreamsTask task : tasks) {
-                    task.stopTask();
-                }
+        }catch (InterruptedException ie) {
+            this.executor.shutdownNow();
+            this.monitor.shutdownNow();
+            throw new RuntimeException(ie);
+        }
+    }
+
+    protected void shutdown(Map<String, List<StreamsTask>> streamsTasks) throws
InterruptedException {
+        LOGGER.info("Attempting to shutdown tasks");
+        monitorThread.shutdown();
+        this.executor.shutdown();
+        //complete stream shut down gracfully
+        for(StreamComponent prov : this.providers.values()) {
+            shutDownTask(prov, streamsTasks);
+        }
+        //need to make this configurable
+        if(!this.executor.awaitTermination(10, TimeUnit.SECONDS)) { // all threads should
have terminated already.
+            this.executor.shutdownNow();
+            this.executor.awaitTermination(10, TimeUnit.SECONDS);
+        }
+        if(!this.monitor.awaitTermination(5, TimeUnit.SECONDS)) { // all threads should have
terminated already.
+            this.monitor.shutdownNow();
+            this.monitor.awaitTermination(5, TimeUnit.SECONDS);
+        }
+    }
+
+    protected void setupProviderTasks(Map<String, StreamsProviderTask> provTasks) {
+        for(StreamComponent prov : this.providers.values()) {
+            StreamsTask task = prov.createConnectedTask(getTimeout());
+            task.setStreamConfig(this.streamConfig);
+            this.executor.submit(task);
+            provTasks.put(prov.getId(), (StreamsProviderTask) task);
+            if( prov.isOperationCountable() ) {
+                this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable)
prov.getOperation(), 10));
+                this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable)
task, 10));
             }
-            this.executor.shutdown();
-            this.monitor.shutdown();
-            try {
-                if(!this.executor.awaitTermination(3, TimeUnit.SECONDS)){
-                    this.executor.shutdownNow();
-                }
-                if(!this.monitor.awaitTermination(3, TimeUnit.SECONDS)){
-                    this.monitor.shutdownNow();
+        }
+    }
+
+    protected void setupComponentTasks(Map<String, List<StreamsTask>> streamsTasks)
{
+        for(StreamComponent comp : this.components.values()) {
+            int tasks = comp.getNumTasks();
+            List<StreamsTask> compTasks = new LinkedList<StreamsTask>();
+            for(int i=0; i < tasks; ++i) {
+                StreamsTask task = comp.createConnectedTask(getTimeout());
+                task.setStreamConfig(this.streamConfig);
+                this.executor.submit(task);
+                compTasks.add(task);
+                if( comp.isOperationCountable() ) {
+                    this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable)
comp.getOperation(), 10));
+                    this.monitor.submit(new StatusCounterMonitorThread((DatumStatusCountable)
task, 10));
                 }
-            }catch (InterruptedException ie) {
-                this.executor.shutdownNow();
-                this.monitor.shutdownNow();
-                throw new RuntimeException(ie);
             }
+            streamsTasks.put(comp.getId(), compTasks);
         }
-
     }
 
     /**
@@ -249,8 +287,13 @@ public class LocalStreamBuilder implements StreamBuilder {
                     task.stopTask();
                 }
                 for(StreamsTask task : tasks) {
-                    while(task.isRunning()) {
+                    int count = 0;
+                    while(count < 20 && task.isRunning()) {
                         Thread.sleep(500);
+                        count++;
+                    }
+                    if(task.isRunning()) {
+                        LOGGER.warn("Task {} failed to terminate in allotted timeframe",
task.toString());
                     }
                 }
             }
@@ -268,7 +311,11 @@ public class LocalStreamBuilder implements StreamBuilder {
      */
     @Override
     public void stop() {
-
+        try {
+            shutdown(tasks);
+        } catch (Exception e) {
+            forceShutdown(tasks);
+        }
     }
 
     private void connectToOtherComponents(String[] conntectToIds, StreamComponent toBeConnected)
{

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6934c7a0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
index 1eac1d9..8146bdd 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsPersistWriterTask.java
@@ -72,12 +72,13 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements
DatumSt
         try {
             this.writer.prepare(this.streamConfig);
             StreamsDatum datum = this.inQueue.poll();
-            while(datum != null || this.keepRunning.get()) {
+            while(this.keepRunning.get()) {
                 if(datum != null) {
                     try {
                         this.writer.write(datum);
                         statusCounter.incrementStatus(DatumStatus.SUCCESS);
                     } catch (Exception e) {
+                        LOGGER.error("Error writing to persist writer {}", this.writer.getClass().getSimpleName(),
e);
                         this.keepRunning.set(false);
                         statusCounter.incrementStatus(DatumStatus.FAIL);
                     }
@@ -86,12 +87,15 @@ public class StreamsPersistWriterTask extends BaseStreamsTask implements
DatumSt
                     try {
                         Thread.sleep(this.sleepTime);
                     } catch (InterruptedException e) {
+                        LOGGER.warn("Thread interrupted in Writer task for {}",this.writer.getClass().getSimpleName(),
e);
                         this.keepRunning.set(false);
                     }
                 }
                 datum = this.inQueue.poll();
             }
 
+        } catch(Exception e) {
+            LOGGER.error("Failed to execute Persist Writer {}",this.writer.getClass().getSimpleName(),
e);
         } finally {
             this.writer.cleanUp();
             this.isRunning.set(false);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6934c7a0/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
index d1ac905..d4c7a16 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/tasks/StreamsProcessorTask.java
@@ -67,7 +67,7 @@ public class StreamsProcessorTask extends BaseStreamsTask {
         try {
             this.processor.prepare(this.streamConfig);
             StreamsDatum datum = this.inQueue.poll();
-            while(datum != null || this.keepRunning.get()) {
+            while(this.keepRunning.get()) {
                 if(datum != null) {
                     List<StreamsDatum> output = this.processor.process(datum);
                     if(output != null) {


Mime
View raw message