Repository: incubator-streams
Updated Branches:
refs/heads/master e7bf641d3 -> 39c07f1d6
resolves STREAMS-325 #325
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/8bd22553
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/8bd22553
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/8bd22553
Branch: refs/heads/master
Commit: 8bd225537944305e84a27547b05c24c6c459e7bf
Parents: 6a05779
Author: Steve Blackmon (@steveblackmon) <sblackmon@apache.org>
Authored: Tue May 19 18:00:15 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sblackmon@apache.org>
Committed: Tue May 19 18:00:15 2015 -0500
----------------------------------------------------------------------
streams-monitoring/pom.xml | 11 +--
.../tasks/BroadcastMonitorThread.java | 81 ++++++++++++--------
.../monitoring/MonitoringConfiguration.json | 23 ++++++
.../tasks/BroadcastMonitorThreadTest.java | 24 ++++--
.../local/builders/LocalStreamBuilder.java | 19 ++++-
5 files changed, 116 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8bd22553/streams-monitoring/pom.xml
----------------------------------------------------------------------
diff --git a/streams-monitoring/pom.xml b/streams-monitoring/pom.xml
index 2f66a07..0e3b86e 100644
--- a/streams-monitoring/pom.xml
+++ b/streams-monitoring/pom.xml
@@ -42,6 +42,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcomponents-core</artifactId>
<type>pom</type>
@@ -88,11 +93,7 @@
<addCompileSourceRoot>true</addCompileSourceRoot>
<generateBuilders>true</generateBuilders>
<sourcePaths>
- <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/Broadcast.json</sourcePath>
- <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/StreamsTaskCounterBroadcast.json</sourcePath>
- <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/ThroughputQueueBroadcast.json</sourcePath>
- <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/DatumStatusCounterBroadcast.json</sourcePath>
- <sourcePath>src/main/jsonschema/org/apache/streams/pojo/json/MemoryUsageBroadcast.json</sourcePath>
+ <sourcePath>src/main/jsonschema</sourcePath>
</sourcePaths>
<outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
<targetPackage>org.apache.streams.pojo.json</targetPackage>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8bd22553/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
index 19b1067..801d2da 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
@@ -21,7 +21,10 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.Lists;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.jackson.*;
+import org.apache.streams.local.monitoring.MonitoringConfiguration;
import org.apache.streams.monitoring.persist.MessagePersister;
import org.apache.streams.monitoring.persist.impl.BroadcastMessagePersister;
import org.apache.streams.monitoring.persist.impl.LogstashUdpMessagePersister;
@@ -52,38 +55,33 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
private long DEFAULT_WAIT_TIME = 30000;
private long waitTime;
private ObjectMapper objectMapper;
- private Map<String, Object> streamConfig;
+ private MonitoringConfiguration configuration;
private URI broadcastURI = null;
private MessagePersister messagePersister;
private volatile boolean keepRunning;
+ /**
+ * DEPRECATED
+ * Please initialize logging with monitoring object via typesafe
+ * @param streamConfig
+ */
+ @Deprecated
public BroadcastMonitorThread(Map<String, Object> streamConfig) {
- keepRunning = true;
- this.streamConfig = streamConfig;
+ this(new ObjectMapper().convertValue(streamConfig, MonitoringConfiguration.class));
+ }
- LOGGER.info("BroadcastMonitorThread starting" + streamConfig);
+ public BroadcastMonitorThread(MonitoringConfiguration configuration) {
- server = ManagementFactory.getPlatformMBeanServer();
+ this.configuration = configuration;
- setBroadcastURI();
- setWaitTime();
+ LOGGER.info("BroadcastMonitorThread created");
- if( broadcastURI != null ) {
- if (broadcastURI.getScheme().equals("http")) {
- messagePersister = new BroadcastMessagePersister(broadcastURI.toString());
- } else if (broadcastURI.getScheme().equals("udp")) {
- messagePersister = new LogstashUdpMessagePersister(broadcastURI.toString());
- } else {
- LOGGER.error("You need to specify a broadcast URI with either a HTTP or UDP
protocol defined.");
- throw new RuntimeException();
- }
- } else {
- messagePersister = new SLF4JMessagePersister();
- }
+ prepare();
initializeObjectMapper();
- LOGGER.info("BroadcastMonitorThread started");
+ LOGGER.info("BroadcastMonitorThread initialized");
+
}
/**
@@ -153,12 +151,10 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
* Go through streams config and set the broadcastURI (if present)
*/
private void setBroadcastURI() {
- if(streamConfig != null &&
- streamConfig.containsKey("broadcastURI") &&
- streamConfig.get("broadcastURI") != null &&
- streamConfig.get("broadcastURI") instanceof String) {
+ if(configuration != null &&
+ configuration.getBroadcastURI() != null) {
try {
- broadcastURI = new URI(streamConfig.get("broadcastURI").toString());
+ broadcastURI = new URI(configuration.getBroadcastURI());
} catch (URISyntaxException e) {
e.printStackTrace();
}
@@ -170,12 +166,9 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
*/
private void setWaitTime() {
try {
- if (streamConfig != null &&
- streamConfig.containsKey("monitoring_broadcast_interval_ms") &&
- streamConfig.get("monitoring_broadcast_interval_ms") != null &&
- (streamConfig.get("monitoring_broadcast_interval_ms") instanceof Long
||
- streamConfig.get("monitoring_broadcast_interval_ms") instanceof Integer))
{
- waitTime = Long.parseLong(streamConfig.get("monitoring_broadcast_interval_ms").toString());
+ if (configuration != null &&
+ configuration.getMonitoringBroadcastIntervalMs() != null ) {
+ waitTime = configuration.getMonitoringBroadcastIntervalMs();
} else {
waitTime = DEFAULT_WAIT_TIME;
}
@@ -189,6 +182,32 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
}
}
+ public void prepare() {
+
+ keepRunning = true;
+
+ LOGGER.info("BroadcastMonitorThread setup " + configuration);
+
+ server = ManagementFactory.getPlatformMBeanServer();
+
+ setBroadcastURI();
+ setWaitTime();
+
+ if( broadcastURI != null ) {
+ if (broadcastURI.getScheme().equals("http")) {
+ messagePersister = new BroadcastMessagePersister(broadcastURI.toString());
+ } else if (broadcastURI.getScheme().equals("udp")) {
+ messagePersister = new LogstashUdpMessagePersister(broadcastURI.toString());
+ } else {
+ LOGGER.error("You need to specify a broadcast URI with either a HTTP or UDP
protocol defined.");
+ throw new RuntimeException();
+ }
+ } else {
+ messagePersister = new SLF4JMessagePersister();
+ }
+
+ }
+
public void shutdown() {
this.keepRunning = false;
LOGGER.debug("Shutting down BroadcastMonitor Thread");
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8bd22553/streams-monitoring/src/main/jsonschema/org/apache/streams/local/monitoring/MonitoringConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/jsonschema/org/apache/streams/local/monitoring/MonitoringConfiguration.json
b/streams-monitoring/src/main/jsonschema/org/apache/streams/local/monitoring/MonitoringConfiguration.json
new file mode 100644
index 0000000..2c50cef
--- /dev/null
+++ b/streams-monitoring/src/main/jsonschema/org/apache/streams/local/monitoring/MonitoringConfiguration.json
@@ -0,0 +1,23 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "type": "object",
+ "title": "object",
+ "javaType": "org.apache.streams.local.monitoring.MonitoringConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "description": "Configuration for local runtime monitoring",
+ "properties": {
+ "broadcastURI": {
+ "type": "string",
+ "description": "URI for monitoring statistics"
+ },
+ "monitoring_broadcast_interval_ms": {
+ "type": "integer",
+ "description": "Milliseconds between publish events",
+ "default": 30000
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8bd22553/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
b/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
index 901888e..7ad315f 100644
--- a/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
+++ b/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
@@ -18,6 +18,8 @@
package org.apache.streams.monitoring.tasks;
import com.google.common.collect.Maps;
+import org.apache.streams.local.monitoring.MonitoringConfiguration;
+import org.junit.Ignore;
import org.junit.Test;
import java.util.Map;
@@ -28,17 +30,29 @@ public class BroadcastMonitorThreadTest {
private ExecutorService executor;
@Test
- public void testThreadNullConfig() {
- BroadcastMonitorThread thread = new BroadcastMonitorThread(null);
+ public void testThreadEmptyBeanConfig() {
+ MonitoringConfiguration monitoringConfiguration = new MonitoringConfiguration();
+ BroadcastMonitorThread thread = new BroadcastMonitorThread(monitoringConfiguration);
+ testThread(thread);
}
@Test
- public void testThread() {
+ public void testThreadEmptyMapConfig() {
+ Map<String, Object> map = Maps.newHashMap();
+ BroadcastMonitorThread thread = new BroadcastMonitorThread(map);
+ testThread(thread);
+ }
+
+ @Test
+ public void testThreadFakeMapConfig() {
Map<String, Object> config = Maps.newHashMap();
config.put("broadcastURI", "http://fakeurl.com/fake");
-
BroadcastMonitorThread thread = new BroadcastMonitorThread(config);
- thread.setDefaultWaitTime(30000L);
+ testThread(thread);
+ }
+
+ public void testThread(BroadcastMonitorThread thread) {
+ thread.setDefaultWaitTime(3000L);
long testRunLength = thread.getDefaultWaitTime() * 1;
executor = Executors.newFixedThreadPool(1);
executor.submit(thread);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/8bd22553/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 601c6b7..f6d8f23 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -19,9 +19,12 @@
package org.apache.streams.local.builders;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.*;
import org.apache.streams.local.counters.StreamsTaskCounter;
import org.apache.streams.local.executors.ShutdownStreamOnUnhandleThrowableThreadPoolExecutor;
+import org.apache.streams.local.monitoring.MonitoringConfiguration;
import org.apache.streams.local.queues.ThroughputQueue;
import org.apache.streams.local.tasks.*;
import org.apache.streams.monitoring.tasks.BroadcastMonitorThread;
@@ -120,7 +123,21 @@ public class LocalStreamBuilder implements StreamBuilder {
this.streamConfig.put(DEFAULT_STARTED_AT_KEY, startedAt.getMillis());
}
this.useDeprecatedMonitors = false;
- this.broadcastMonitor = new BroadcastMonitorThread(this.streamConfig);
+
+ /* for backward-compatibility with streamConfig */
+ MonitoringConfiguration monitoringConfiguration;
+ if( StreamsConfigurator.getConfig().hasPath("monitoring") ) {
+ monitoringConfiguration = new ComponentConfigurator<>(MonitoringConfiguration.class).detectConfiguration("monitoring");
+ } else {
+ monitoringConfiguration = new MonitoringConfiguration();
+ }
+ if( this.streamConfig != null &&
+ this.streamConfig.containsKey(BROADCAST_KEY)) {
+ monitoringConfiguration.setBroadcastURI(this.streamConfig.get(BROADCAST_KEY).toString());
+ if( this.streamConfig.containsKey(BROADCAST_INTERVAL_KEY))
+ monitoringConfiguration.setMonitoringBroadcastIntervalMs(Long.parseLong(this.streamConfig.get(BROADCAST_INTERVAL_KEY).toString()));
+ }
+ this.broadcastMonitor = new BroadcastMonitorThread(monitoringConfiguration);
this.futures = new HashMap<>();
}
|