streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject [1/7] incubator-streams git commit: resolves STREAMS-325 #325
Date Fri, 19 Jun 2015 22:02:39 GMT
Repository: incubator-streams
Updated Branches:
  refs/heads/master e7bf641d3 -> 39c07f1d6


resolves STREAMS-325 #325


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/8bd22553
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/8bd22553
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/8bd22553

Branch: refs/heads/master
Commit: 8bd225537944305e84a27547b05c24c6c459e7bf
Parents: 6a05779
Author: Steve Blackmon (@steveblackmon) <sblackmon@apache.org>
Authored: Tue May 19 18:00:15 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sblackmon@apache.org>
Committed: Tue May 19 18:00:15 2015 -0500

----------------------------------------------------------------------
 streams-monitoring/pom.xml                      | 11 +--
 .../tasks/BroadcastMonitorThread.java           | 81 ++++++++++++--------
 .../monitoring/MonitoringConfiguration.json     | 23 ++++++
 .../tasks/BroadcastMonitorThreadTest.java       | 24 ++++--
 .../local/builders/LocalStreamBuilder.java      | 19 ++++-
 5 files changed, 116 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8bd22553/streams-monitoring/pom.xml
----------------------------------------------------------------------
diff --git a/streams-monitoring/pom.xml b/streams-monitoring/pom.xml
index 2f66a07..0e3b86e 100644
--- a/streams-monitoring/pom.xml
+++ b/streams-monitoring/pom.xml
@@ -42,6 +42,11 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpcomponents-core</artifactId>
             <type>pom</type>
@@ -88,11 +93,7 @@
                     <addCompileSourceRoot>true</addCompileSourceRoot>
                     <generateBuilders>true</generateBuilders>
                     <sourcePaths>
-                        <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json</sourcePath>
-                        <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json</sourcePath>
-                        <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json</sourcePath>
-                        <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json</sourcePath>
-                        <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json</sourcePath>
+                        <sourcePath>src/main/jsonschema</sourcePath>
                     </sourcePaths>
                     <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
                     <targetPackage>org.apache.streams.pojo.json</targetPackage>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8bd22553/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
index 19b1067..801d2da 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
@@ -21,7 +21,10 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.common.collect.Lists;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.jackson.*;
+import org.apache.streams.local.monitoring.MonitoringConfiguration;
 import org.apache.streams.monitoring.persist.MessagePersister;
 import org.apache.streams.monitoring.persist.impl.BroadcastMessagePersister;
 import org.apache.streams.monitoring.persist.impl.LogstashUdpMessagePersister;
@@ -52,38 +55,33 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
     private long DEFAULT_WAIT_TIME = 30000;
     private long waitTime;
     private ObjectMapper objectMapper;
-    private Map<String, Object> streamConfig;
+    private MonitoringConfiguration configuration;
     private URI broadcastURI = null;
     private MessagePersister messagePersister;
     private volatile boolean keepRunning;
 
+    /**
+     * DEPRECATED
+     * Please initialize logging with monitoring object via typesafe
+     * @param streamConfig
+     */
+    @Deprecated
     public BroadcastMonitorThread(Map<String, Object> streamConfig) {
-        keepRunning = true;
-        this.streamConfig = streamConfig;
+        this(new ObjectMapper().convertValue(streamConfig, MonitoringConfiguration.class));
+    }
 
-        LOGGER.info("BroadcastMonitorThread starting" + streamConfig);
+    public BroadcastMonitorThread(MonitoringConfiguration configuration) {
 
-        server = ManagementFactory.getPlatformMBeanServer();
+        this.configuration = configuration;
 
-        setBroadcastURI();
-        setWaitTime();
+        LOGGER.info("BroadcastMonitorThread created");
 
-        if( broadcastURI != null ) {
-            if (broadcastURI.getScheme().equals("http")) {
-                messagePersister = new BroadcastMessagePersister(broadcastURI.toString());
-            } else if (broadcastURI.getScheme().equals("udp")) {
-                messagePersister = new LogstashUdpMessagePersister(broadcastURI.toString());
-            } else {
-                LOGGER.error("You need to specify a broadcast URI with either a HTTP or UDP
protocol defined.");
-                throw new RuntimeException();
-            }
-        } else {
-            messagePersister = new SLF4JMessagePersister();
-        }
+        prepare();
 
         initializeObjectMapper();
 
-        LOGGER.info("BroadcastMonitorThread started");
+        LOGGER.info("BroadcastMonitorThread initialized");
+
     }
 
     /**
@@ -153,12 +151,10 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
      * Go through streams config and set the broadcastURI (if present)
      */
     private void setBroadcastURI() {
-        if(streamConfig != null &&
-                streamConfig.containsKey("broadcastURI") &&
-                streamConfig.get("broadcastURI") != null &&
-                streamConfig.get("broadcastURI") instanceof String) {
+        if(configuration != null &&
+            configuration.getBroadcastURI() != null) {
             try {
-                broadcastURI = new URI(streamConfig.get("broadcastURI").toString());
+                broadcastURI = new URI(configuration.getBroadcastURI());
             } catch (URISyntaxException e) {
                 e.printStackTrace();
             }
@@ -170,12 +166,9 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
      */
     private void setWaitTime() {
         try {
-            if (streamConfig != null &&
-                    streamConfig.containsKey("monitoring_broadcast_interval_ms") &&
-                    streamConfig.get("monitoring_broadcast_interval_ms") != null &&
-                    (streamConfig.get("monitoring_broadcast_interval_ms") instanceof Long
||
-                    streamConfig.get("monitoring_broadcast_interval_ms") instanceof Integer))
{
-                waitTime = Long.parseLong(streamConfig.get("monitoring_broadcast_interval_ms").toString());
+            if (configuration != null &&
+                configuration.getMonitoringBroadcastIntervalMs() != null ) {
+                waitTime = configuration.getMonitoringBroadcastIntervalMs();
             } else {
                 waitTime = DEFAULT_WAIT_TIME;
             }
@@ -189,6 +182,32 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
         }
     }
 
+    public void prepare() {
+
+        keepRunning = true;
+
+        LOGGER.info("BroadcastMonitorThread setup " + configuration);
+
+        server = ManagementFactory.getPlatformMBeanServer();
+
+        setBroadcastURI();
+        setWaitTime();
+
+        if( broadcastURI != null ) {
+            if (broadcastURI.getScheme().equals("http")) {
+                messagePersister = new BroadcastMessagePersister(broadcastURI.toString());
+            } else if (broadcastURI.getScheme().equals("udp")) {
+                messagePersister = new LogstashUdpMessagePersister(broadcastURI.toString());
+            } else {
+                LOGGER.error("You need to specify a broadcast URI with either a HTTP or UDP
protocol defined.");
+                throw new RuntimeException();
+            }
+        } else {
+            messagePersister = new SLF4JMessagePersister();
+        }
+
+    }
+
     public void shutdown() {
         this.keepRunning = false;
         LOGGER.debug("Shutting down BroadcastMonitor Thread");

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8bd22553/streams-monitoring/src/main/jsonschema/org/apache/streams/local/monitoring/MonitoringConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/jsonschema/org/apache/streams/local/monitoring/MonitoringConfiguration.json
b/streams-monitoring/src/main/jsonschema/org/apache/streams/local/monitoring/MonitoringConfiguration.json
new file mode 100644
index 0000000..2c50cef
--- /dev/null
+++ b/streams-monitoring/src/main/jsonschema/org/apache/streams/local/monitoring/MonitoringConfiguration.json
@@ -0,0 +1,23 @@
+{
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "type": "object",
+    "title": "object",
+    "javaType": "org.apache.streams.local.monitoring.MonitoringConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "description": "Configuration for local runtime monitoring",
+    "properties": {
+        "broadcastURI": {
+            "type": "string",
+            "description": "URI for monitoring statistics"
+        },
+        "monitoring_broadcast_interval_ms": {
+            "type": "integer",
+            "description": "Milliseconds between publish events",
+            "default": 30000
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8bd22553/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
b/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
index 901888e..7ad315f 100644
--- a/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
+++ b/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
@@ -18,6 +18,8 @@
 package org.apache.streams.monitoring.tasks;
 
 import com.google.common.collect.Maps;
+import org.apache.streams.local.monitoring.MonitoringConfiguration;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.Map;
@@ -28,17 +30,29 @@ public class BroadcastMonitorThreadTest {
     private ExecutorService executor;
 
     @Test
-    public void testThreadNullConfig() {
-        BroadcastMonitorThread thread = new BroadcastMonitorThread(null);
+    public void testThreadEmptyBeanConfig() {
+        MonitoringConfiguration monitoringConfiguration = new MonitoringConfiguration();
+        BroadcastMonitorThread thread = new BroadcastMonitorThread(monitoringConfiguration);
+        testThread(thread);
     }
 
     @Test
-    public void testThread() {
+    public void testThreadEmptyMapConfig() {
+        Map<String, Object> map = Maps.newHashMap();
+        BroadcastMonitorThread thread = new BroadcastMonitorThread(map);
+        testThread(thread);
+    }
+
+    @Test
+    public void testThreadFakeMapConfig() {
         Map<String, Object> config = Maps.newHashMap();
         config.put("broadcastURI", "http://fakeurl.com/fake");
-
         BroadcastMonitorThread thread = new BroadcastMonitorThread(config);
-        thread.setDefaultWaitTime(30000L);
+        testThread(thread);
+    }
+
+    public void testThread(BroadcastMonitorThread thread) {
+        thread.setDefaultWaitTime(3000L);
         long testRunLength = thread.getDefaultWaitTime() * 1;
         executor = Executors.newFixedThreadPool(1);
         executor.submit(thread);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8bd22553/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 601c6b7..f6d8f23 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -19,9 +19,12 @@
 package org.apache.streams.local.builders;
 
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.*;
 import org.apache.streams.local.counters.StreamsTaskCounter;
 import org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor;
+import org.apache.streams.local.monitoring.MonitoringConfiguration;
 import org.apache.streams.local.queues.ThroughputQueue;
 import org.apache.streams.local.tasks.*;
 import org.apache.streams.monitoring.tasks.BroadcastMonitorThread;
@@ -120,7 +123,21 @@ public class LocalStreamBuilder implements StreamBuilder {
             this.streamConfig.put(DEFAULT_STARTED_AT_KEY, startedAt.getMillis());
         }
         this.useDeprecatedMonitors = false;
-        this.broadcastMonitor = new BroadcastMonitorThread(this.streamConfig);
+
+        /* for backward-compatibility with streamConfig */
+        MonitoringConfiguration monitoringConfiguration;
+        if( StreamsConfigurator.getConfig().hasPath("monitoring") ) {
+            monitoringConfiguration = new ComponentConfigurator<>(MonitoringConfiguration.class).detectConfiguration("monitoring");
+        } else {
+            monitoringConfiguration = new MonitoringConfiguration();
+        }
+        if( this.streamConfig != null &&
+            this.streamConfig.containsKey(BROADCAST_KEY)) {
+            monitoringConfiguration.setBroadcastURI(this.streamConfig.get(BROADCAST_KEY).toString());
+            if( this.streamConfig.containsKey(BROADCAST_INTERVAL_KEY))
+                monitoringConfiguration.setMonitoringBroadcastIntervalMs(Long.parseLong(this.streamConfig.get(BROADCAST_INTERVAL_KEY).toString()));
+        }
+        this.broadcastMonitor = new BroadcastMonitorThread(monitoringConfiguration);
 
         this.futures = new HashMap<>();
     }


Mime
View raw message