streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject [6/7] incubator-streams git commit: improvements and new test for STREAMS-325
Date Fri, 19 Jun 2015 22:02:44 GMT
improvements and new test for STREAMS-325


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/94c3fd5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/94c3fd5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/94c3fd5e

Branch: refs/heads/master
Commit: 94c3fd5ea577f0710f61e7eb4c5c9af57ec33001
Parents: ebeaa16
Author: Steve Blackmon (@steveblackmon) <sblackmon@apache.org>
Authored: Fri Jun 12 11:29:32 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sblackmon@apache.org>
Committed: Fri Jun 12 11:53:14 2015 -0500

----------------------------------------------------------------------
 .../tasks/BroadcastMonitorThread.java           | 96 ++++++--------------
 .../tasks/BroadcastMonitorThreadTest.java       | 15 ++-
 .../local/builders/LocalStreamBuilder.java      |  2 +-
 3 files changed, 44 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/94c3fd5e/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
index b9db630..20c605e 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
@@ -53,15 +53,13 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
     private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(BroadcastMonitorThread.class);
     private static MBeanServer server;
 
-    private long DEFAULT_WAIT_TIME = 30000;
-    private long waitTime;
-    private ObjectMapper objectMapper;
-    private StreamsConfiguration streamConfig;
     private MonitoringConfiguration configuration;
     private URI broadcastURI = null;
     private MessagePersister messagePersister;
     private volatile boolean keepRunning;
 
+    private static ObjectMapper objectMapper = StreamsJacksonMapper.getInstance();
+
     /**
      * DEPRECATED
      * Please initialize logging with monitoring object via typesafe
@@ -69,24 +67,25 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
      */
     @Deprecated
     public BroadcastMonitorThread(Map<String, Object> streamConfig) {
-        this(new ObjectMapper().convertValue(streamConfig, StreamsConfiguration.class));
+        this(objectMapper.convertValue(streamConfig, MonitoringConfiguration.class));
     }
 
     public BroadcastMonitorThread(StreamsConfiguration streamConfig) {
-        this(streamConfig, new ObjectMapper().convertValue(streamConfig.getAdditionalProperties().get("monitoring"),
MonitoringConfiguration.class));
+        this(objectMapper.convertValue(streamConfig.getAdditionalProperties().get("monitoring"),
MonitoringConfiguration.class));
     }
 
-    public BroadcastMonitorThread(StreamsConfiguration streamConfig, MonitoringConfiguration
configuration) {
+    public BroadcastMonitorThread(MonitoringConfiguration configuration) {
 
-        this.streamConfig = streamConfig;
         this.configuration = configuration;
+        if( this.configuration == null )
+            this.configuration = new ComponentConfigurator<>(MonitoringConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().atPath("monitoring"));
 
         LOGGER.info("BroadcastMonitorThread created");
 
-        prepare();
-
         initializeObjectMapper();
 
+        prepare();
+
         LOGGER.info("BroadcastMonitorThread initialized");
 
     }
@@ -97,7 +96,6 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
      * POJOs which are generated from JSON schemas
      */
     private void initializeObjectMapper() {
-        objectMapper = StreamsJacksonMapper.getInstance();
         SimpleModule simpleModule = new SimpleModule();
 
         simpleModule.addDeserializer(MemoryUsageBroadcast.class, new MemoryUsageDeserializer());
@@ -142,7 +140,7 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
                 }
 
                 messagePersister.persistMessages(messages);
-                Thread.sleep(waitTime);
+                Thread.sleep(configuration.getMonitoringBroadcastIntervalMs());
             } catch (InterruptedException e) {
                 LOGGER.debug("Broadcast Monitor Interrupted!");
                 Thread.currentThread().interrupt();
@@ -154,60 +152,33 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
         }
     }
 
-    /**
-     * Go through streams config and set the broadcastURI (if present)
-     */
-    private void setBroadcastURI() {
-        if(configuration != null &&
-            configuration.getBroadcastURI() != null) {
-            try {
-                broadcastURI = new URI(configuration.getBroadcastURI());
-            } catch (URISyntaxException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    /**
-     * Go through streams config and set the thread's wait time (if present)
-     */
-    private void setWaitTime() {
-        try {
-            if (configuration != null &&
-                configuration.getMonitoringBroadcastIntervalMs() != null ) {
-                waitTime = configuration.getMonitoringBroadcastIntervalMs();
-            } else {
-                waitTime = DEFAULT_WAIT_TIME;
-            }
-
-            //Shutdown
-            if(waitTime == -1) {
-                this.keepRunning = false;
-            }
-        } catch (Exception e) {
-            LOGGER.error("Exception while trying to set default broadcast thread wait time:
{}", e);
-        }
-    }
-
     public void prepare() {
 
         keepRunning = true;
 
-        LOGGER.info("BroadcastMonitorThread setup " + configuration);
+        LOGGER.info("BroadcastMonitorThread setup " + this.configuration);
 
         server = ManagementFactory.getPlatformMBeanServer();
 
-        setBroadcastURI();
-        setWaitTime();
+        if (this.configuration != null) {
+
+            try {
+                broadcastURI = new URI(configuration.getBroadcastURI());
+            } catch (Exception e) {
+                LOGGER.error("invalid URI: ", e);
+            }
 
-        if( broadcastURI != null ) {
-            if (broadcastURI.getScheme().equals("http")) {
-                messagePersister = new BroadcastMessagePersister(broadcastURI.toString());
-            } else if (broadcastURI.getScheme().equals("udp")) {
-                messagePersister = new LogstashUdpMessagePersister(broadcastURI.toString());
+            if (broadcastURI != null) {
+                if (broadcastURI.getScheme().equals("http")) {
+                    messagePersister = new BroadcastMessagePersister(broadcastURI.toString());
+                } else if (broadcastURI.getScheme().equals("udp")) {
+                    messagePersister = new LogstashUdpMessagePersister(broadcastURI.toString());
+                } else {
+                    LOGGER.error("You need to specify a broadcast URI with either a HTTP
or UDP protocol defined.");
+                    throw new RuntimeException();
+                }
             } else {
-                LOGGER.error("You need to specify a broadcast URI with either a HTTP or UDP
protocol defined.");
-                throw new RuntimeException();
+                messagePersister = new SLF4JMessagePersister();
             }
         } else {
             messagePersister = new SLF4JMessagePersister();
@@ -221,18 +192,11 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
     }
 
     public String getBroadcastURI() {
-        return broadcastURI.toString();
+        return configuration.getBroadcastURI();
     }
 
     public long getWaitTime() {
-        return waitTime;
-    }
-
-    public long getDefaultWaitTime() {
-        return DEFAULT_WAIT_TIME;
+        return configuration.getMonitoringBroadcastIntervalMs();
     }
 
-    public void setDefaultWaitTime(long defaultWaitTime) {
-        this.DEFAULT_WAIT_TIME = defaultWaitTime;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/94c3fd5e/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
b/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
index 427757b..a959bd2 100644
--- a/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
+++ b/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
@@ -52,9 +52,20 @@ public class BroadcastMonitorThreadTest {
         testThread(thread);
     }
 
+    @Test
+    public void testThreadStreamsConfig() {
+
+        StreamsConfiguration streams = new StreamsConfiguration();
+        MonitoringConfiguration monitoring = new MonitoringConfiguration();
+        monitoring.setBroadcastURI("http://fakeurl.com/fake");
+        monitoring.setMonitoringBroadcastIntervalMs(30000L);
+        streams.setAdditionalProperty("monitoring", monitoring);
+        BroadcastMonitorThread thread = new BroadcastMonitorThread(streams);
+        testThread(thread);
+    }
+
     public void testThread(BroadcastMonitorThread thread) {
-        thread.setDefaultWaitTime(3000L);
-        long testRunLength = thread.getDefaultWaitTime() * 1;
+        long testRunLength = thread.getWaitTime() * 1;
         executor = Executors.newFixedThreadPool(1);
         executor.submit(thread);
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/94c3fd5e/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 4175e10..4cf078e 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -151,7 +151,7 @@ public class LocalStreamBuilder implements StreamBuilder {
             }
         };
         this.useDeprecatedMonitors = false;
-        this.broadcastMonitor = new BroadcastMonitorThread(this.streamConfig);
+        this.broadcastMonitor = new BroadcastMonitorThread(this.streamConfig.getMonitoring());
     }
 
     public void setUseDeprecatedMonitors(boolean useDeprecatedMonitors) {


Mime
View raw message