streams-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sblack...@apache.org
Subject [2/5] incubator-streams git commit: added working UDP log stash persister
Date Tue, 17 Feb 2015 19:16:51 GMT
added working UDP log stash persister


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5667a7e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5667a7e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5667a7e6

Branch: refs/heads/master
Commit: 5667a7e6a59038060d38c37a15d4c8a30eb11fab
Parents: b65e198
Author: sblackmon <sblackmon@apache.org>
Authored: Thu Feb 12 14:47:13 2015 -0600
Committer: sblackmon <sblackmon@apache.org>
Committed: Thu Feb 12 18:17:19 2015 -0600

----------------------------------------------------------------------
 .../impl/LogstashUdpMessagePersister.java       | 104 +++++++++++++++++++
 .../tasks/BroadcastMonitorThread.java           |  21 ++--
 2 files changed, 119 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5667a7e6/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersister.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersister.java
b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersister.java
new file mode 100644
index 0000000..6e145b9
--- /dev/null
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/persist/impl/LogstashUdpMessagePersister.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.streams.monitoring.persist.impl;
+
+import org.apache.streams.monitoring.persist.MessagePersister;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+public class LogstashUdpMessagePersister implements MessagePersister {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(LogstashUdpMessagePersister.class);
+    private String broadcastURI;
+    URI uri;
+
+    public LogstashUdpMessagePersister(String broadcastURI) {
+        this.broadcastURI = broadcastURI;
+        setup();
+    }
+
+    public void setup() {
+
+        try {
+            uri = new URI(broadcastURI);
+        } catch (URISyntaxException e) {
+            e.printStackTrace();
+        }
+
+    }
+    @Override
+    /**
+     * Given a list of messages as Strings, broadcast them to the broadcastURI
+     * (if one is defined)
+     * @param messages
+     * @return int status code from POST response
+     */
+    public int persistMessages(List<String> messages) {
+        int responseCode = -1;
+
+        if(broadcastURI != null) {
+            DatagramSocket socket = null;
+            try {
+                socket = new DatagramSocket();
+            } catch (SocketException e) {
+                LOGGER.error("Metrics Broadcast Setup Failed: " + e.getMessage());
+            }
+            try {
+                ByteBuffer toWrite = ByteBuffer.wrap(serializeMessages(messages).getBytes());
+                byte[] byteArray = toWrite.array();
+                DatagramPacket packet = new DatagramPacket(byteArray, byteArray.length);
+                socket.connect(new InetSocketAddress(uri.getHost(), uri.getPort()));
+                socket.send(packet);
+            } catch( Exception e ) {
+                LOGGER.error("Metrics Broadcast Failed: " + e.getMessage());
+            } finally {
+                socket.close();
+            }
+        }
+
+        return responseCode;
+    }
+
+    /**
+     * Given a List of String messages, convert them to a JSON array
+     * @param messages
+     * @return Serialized version of this JSON array
+     */
+    private String serializeMessages(List<String> messages) {
+
+        StringBuilder json_lines = new StringBuilder();
+        for(String message : messages) {
+            json_lines.append(message).append('\n');
+        }
+
+        return json_lines.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5667a7e6/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
----------------------------------------------------------------------
diff --git a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
index 6867bd3..948863e 100644
--- a/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
+++ b/streams-monitoring/src/main/java/org/apache/streams/monitoring/tasks/BroadcastMonitorThread.java
@@ -20,17 +20,19 @@ package org.apache.streams.monitoring.tasks;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import org.apache.streams.jackson.*;
 import org.apache.streams.monitoring.persist.MessagePersister;
 import org.apache.streams.monitoring.persist.impl.BroadcastMessagePersister;
+import org.apache.streams.monitoring.persist.impl.LogstashUdpMessagePersister;
 import org.apache.streams.monitoring.persist.impl.SLF4JMessagePersister;
 import org.apache.streams.pojo.json.*;
 import org.slf4j.Logger;
 
 import javax.management.*;
 import java.lang.management.ManagementFactory;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -47,7 +49,7 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
     private long waitTime;
     private ObjectMapper objectMapper;
     private Map<String, Object> streamConfig;
-    private String broadcastURI = null;
+    private URI broadcastURI = null;
     private MessagePersister messagePersister;
     private volatile boolean keepRunning;
 
@@ -62,8 +64,11 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
         setBroadcastURI();
         setWaitTime();
 
-        if( !Strings.isNullOrEmpty(broadcastURI))
-            messagePersister = new BroadcastMessagePersister(broadcastURI);
+        if( broadcastURI != null )
+            if( broadcastURI.getScheme().equals("http"))
+                messagePersister = new BroadcastMessagePersister(broadcastURI.toString());
+            else if( broadcastURI.getScheme().equals("udp"))
+                messagePersister = new LogstashUdpMessagePersister(broadcastURI.toString());
         else
             messagePersister = new SLF4JMessagePersister();
 
@@ -143,7 +148,11 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
                 streamConfig.containsKey("broadcastURI") &&
                 streamConfig.get("broadcastURI") != null &&
                 streamConfig.get("broadcastURI") instanceof String) {
-            broadcastURI = streamConfig.get("broadcastURI").toString();
+            try {
+                broadcastURI = new URI(streamConfig.get("broadcastURI").toString());
+            } catch (URISyntaxException e) {
+                e.printStackTrace();
+            }
         }
     }
 
@@ -177,7 +186,7 @@ public class BroadcastMonitorThread extends NotificationBroadcasterSupport
imple
     }
 
     public String getBroadcastURI() {
-        return broadcastURI;
+        return broadcastURI.toString();
     }
 
     public long getWaitTime() {


Mime
View raw message