improvements and new test for STREAMS-325
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/94c3fd5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/94c3fd5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/94c3fd5e
Branch: refs/heads/master
Commit: 94c3fd5ea577f0710f61e7eb4c5c9af57ec33001
Parents: ebeaa16
Author: Steve Blackmon (@steveblackmon) <sblackmon@apache.org>
Authored: Fri Jun 12 11:29:32 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sblackmon@apache.org>
Committed: Fri Jun 12 11:53:14 2015 -0500
----------------------------------------------------------------------
.../tasks/BroadcastMonitorThread.java | 96 ++++++--------------
.../tasks/BroadcastMonitorThreadTest.java | 15 ++-
.../local/builders/LocalStreamBuilder.java | 2 +-
3 files changed, 44 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/94c3fd5e/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
index b9db630..20c605e 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
@@ -53,15 +53,13 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(BroadcastMonitorThread.class);
private static MBeanServer server;
- private long DEFAULT_WAIT_TIME = 30000;
- private long waitTime;
- private ObjectMapper objectMapper;
- private StreamsConfiguration streamConfig;
private MonitoringConfiguration configuration;
private URI broadcastURI = null;
private MessagePersister messagePersister;
private volatile boolean keepRunning;
+ private static ObjectMapper objectMapper = StreamsJacksonMapper.getInstance();
+
/**
* DEPRECATED
* Please initialize logging with monitoring object via typesafe
@@ -69,24 +67,25 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
*/
@Deprecated
public BroadcastMonitorThread(Map<String, Object> streamConfig) {
- this(new ObjectMapper().convertValue(streamConfig, StreamsConfiguration.class));
+ this(objectMapper.convertValue(streamConfig, MonitoringConfiguration.class));
}
public BroadcastMonitorThread(StreamsConfiguration streamConfig) {
- this(streamConfig, new ObjectMapper().convertValue(streamConfig.getAdditionalProperties().get("monitoring"),
MonitoringConfiguration.class));
+ this(objectMapper.convertValue(streamConfig.getAdditionalProperties().get("monitoring"),
MonitoringConfiguration.class));
}
- public BroadcastMonitorThread(StreamsConfiguration streamConfig, MonitoringConfiguration
configuration) {
+ public BroadcastMonitorThread(MonitoringConfiguration configuration) {
- this.streamConfig = streamConfig;
this.configuration = configuration;
+ if( this.configuration == null )
+ this.configuration = new ComponentConfigurator<>(MonitoringConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().atPath("monitoring"));
LOGGER.info("BroadcastMonitorThread created");
- prepare();
-
initializeObjectMapper();
+ prepare();
+
LOGGER.info("BroadcastMonitorThread initialized");
}
@@ -97,7 +96,6 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
* POJOs which are generated from JSON schemas
*/
private void initializeObjectMapper() {
- objectMapper = StreamsJacksonMapper.getInstance();
SimpleModule simpleModule = new SimpleModule();
simpleModule.addDeserializer(MemoryUsageBroadcast.class, new MemoryUsageDeserializer());
@@ -142,7 +140,7 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
}
messagePersister.persistMessages(messages);
- Thread.sleep(waitTime);
+ Thread.sleep(configuration.getMonitoringBroadcastIntervalMs());
} catch (InterruptedException e) {
LOGGER.debug("Broadcast Monitor Interrupted!");
Thread.currentThread().interrupt();
@@ -154,60 +152,33 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
}
}
- /**
- * Go through streams config and set the broadcastURI (if present)
- */
- private void setBroadcastURI() {
- if(configuration != null &&
- configuration.getBroadcastURI() != null) {
- try {
- broadcastURI = new URI(configuration.getBroadcastURI());
- } catch (URISyntaxException e) {
- e.printStackTrace();
- }
- }
- }
-
- /**
- * Go through streams config and set the thread's wait time (if present)
- */
- private void setWaitTime() {
- try {
- if (configuration != null &&
- configuration.getMonitoringBroadcastIntervalMs() != null ) {
- waitTime = configuration.getMonitoringBroadcastIntervalMs();
- } else {
- waitTime = DEFAULT_WAIT_TIME;
- }
-
- //Shutdown
- if(waitTime == -1) {
- this.keepRunning = false;
- }
- } catch (Exception e) {
- LOGGER.error("Exception while trying to set default broadcast thread wait time:
{}", e);
- }
- }
-
public void prepare() {
keepRunning = true;
- LOGGER.info("BroadcastMonitorThread setup " + configuration);
+ LOGGER.info("BroadcastMonitorThread setup " + this.configuration);
server = ManagementFactory.getPlatformMBeanServer();
- setBroadcastURI();
- setWaitTime();
+ if (this.configuration != null) {
+
+ try {
+ broadcastURI = new URI(configuration.getBroadcastURI());
+ } catch (Exception e) {
+ LOGGER.error("invalid URI: ", e);
+ }
- if( broadcastURI != null ) {
- if (broadcastURI.getScheme().equals("http")) {
- messagePersister = new BroadcastMessagePersister(broadcastURI.toString());
- } else if (broadcastURI.getScheme().equals("udp")) {
- messagePersister = new LogstashUdpMessagePersister(broadcastURI.toString());
+ if (broadcastURI != null) {
+ if (broadcastURI.getScheme().equals("http")) {
+ messagePersister = new BroadcastMessagePersister(broadcastURI.toString());
+ } else if (broadcastURI.getScheme().equals("udp")) {
+ messagePersister = new LogstashUdpMessagePersister(broadcastURI.toString());
+ } else {
+ LOGGER.error("You need to specify a broadcast URI with either a HTTP
or UDP protocol defined.");
+ throw new RuntimeException();
+ }
} else {
- LOGGER.error("You need to specify a broadcast URI with either a HTTP or UDP
protocol defined.");
- throw new RuntimeException();
+ messagePersister = new SLF4JMessagePersister();
}
} else {
messagePersister = new SLF4JMessagePersister();
@@ -221,18 +192,11 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
}
public String getBroadcastURI() {
- return broadcastURI.toString();
+ return configuration.getBroadcastURI();
}
public long getWaitTime() {
- return waitTime;
- }
-
- public long getDefaultWaitTime() {
- return DEFAULT_WAIT_TIME;
+ return configuration.getMonitoringBroadcastIntervalMs();
}
- public void setDefaultWaitTime(long defaultWaitTime) {
- this.DEFAULT_WAIT_TIME = defaultWaitTime;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/94c3fd5e/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
b/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
index 427757b..a959bd2 100644
--- a/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
+++ b/streams-monitoring/src/test/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThreadTest.java
@@ -52,9 +52,20 @@ public class BroadcastMonitorThreadTest {
testThread(thread);
}
+ @Test
+ public void testThreadStreamsConfig() {
+
+ StreamsConfiguration streams = new StreamsConfiguration();
+ MonitoringConfiguration monitoring = new MonitoringConfiguration();
+ monitoring.setBroadcastURI("http://fakeurl.com/fake");
+ monitoring.setMonitoringBroadcastIntervalMs(30000L);
+ streams.setAdditionalProperty("monitoring", monitoring);
+ BroadcastMonitorThread thread = new BroadcastMonitorThread(streams);
+ testThread(thread);
+ }
+
public void testThread(BroadcastMonitorThread thread) {
- thread.setDefaultWaitTime(3000L);
- long testRunLength = thread.getDefaultWaitTime() * 1;
+ long testRunLength = thread.getWaitTime() * 1;
executor = Executors.newFixedThreadPool(1);
executor.submit(thread);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/94c3fd5e/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
index 4175e10..4cf078e 100644
--- a/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
+++ b/streams-runtimes/streams-runtime-local/src/main/java/org/apache/streams/local/builders/LocalStreamBuilder.java
@@ -151,7 +151,7 @@ public class LocalStreamBuilder implements StreamBuilder {
}
};
this.useDeprecatedMonitors = false;
- this.broadcastMonitor = new BroadcastMonitorThread(this.streamConfig);
+ this.broadcastMonitor = new BroadcastMonitorThread(this.streamConfig.getMonitoring());
}
public void setUseDeprecatedMonitors(boolean useDeprecatedMonitors) {
|