karaf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [karaf-decanter] branch master updated: [KARAF-5454] - Collector socket - Add UDP protocol support (#18)
Date Thu, 16 Nov 2017 14:48:47 GMT
This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/karaf-decanter.git


The following commit(s) were added to refs/heads/master by this push:
     new dc39ee7  [KARAF-5454] - Collector socket - Add UDP protocol support (#18)
dc39ee7 is described below

commit dc39ee7796135348edb80331e1ca416921c04b05
Author: fpapon <6915443+fpapon@users.noreply.github.com>
AuthorDate: Thu Nov 16 18:48:45 2017 +0400

    [KARAF-5454] - Collector socket - Add UDP protocol support (#18)
    
    * [KARAF-5454] - Collector socket - Add UDP protocol support
---
 .../org.apache.karaf.decanter.collector.socket.cfg |  5 +-
 .../decanter/collector/socket/SocketCollector.java | 90 ++++++++++++++++++++--
 .../src/main/asciidoc/user-guide/collectors.adoc   |  4 +
 3 files changed, 93 insertions(+), 6 deletions(-)

diff --git a/collector/socket/src/main/cfg/org.apache.karaf.decanter.collector.socket.cfg
b/collector/socket/src/main/cfg/org.apache.karaf.decanter.collector.socket.cfg
index b9a04b2..ed67ea7 100644
--- a/collector/socket/src/main/cfg/org.apache.karaf.decanter.collector.socket.cfg
+++ b/collector/socket/src/main/cfg/org.apache.karaf.decanter.collector.socket.cfg
@@ -4,4 +4,7 @@
 #port=34343
 
 # Number of worker threads to deal with
-#workers=10
\ No newline at end of file
+#workers=10
+
+# Protocol tcp(default) or udp
+#protocol=tcp
diff --git a/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
b/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
index 49ddb63..3c1e48e 100644
--- a/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
+++ b/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
@@ -17,6 +17,8 @@
 package org.apache.karaf.decanter.collector.socket;
 
 import java.io.*;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
@@ -49,6 +51,8 @@ public class SocketCollector implements Closeable, Runnable {
     private static final Logger LOGGER = LoggerFactory.getLogger(SocketCollector.class);
 
     private ServerSocket serverSocket;
+    private DatagramSocket datagramSocket;
+    private Protocol protocol;
     private EventAdmin eventAdmin;
     private boolean open;
     private ExecutorService executor;
@@ -56,6 +60,11 @@ public class SocketCollector implements Closeable, Runnable {
     private String eventAdminTopic;
     private EventAdmin dispatcher;
     private Unmarshaller unmarshaller;
+    
+    private enum Protocol {
+        TCP,
+        UDP;
+    }
 
     @SuppressWarnings("unchecked")
     @Activate
@@ -63,8 +72,24 @@ public class SocketCollector implements Closeable, Runnable {
         this.properties = context.getProperties();
         int port = Integer.parseInt(getProperty(this.properties, "port", "34343"));
         int workers = Integer.parseInt(getProperty(this.properties, "workers", "10"));
+        
+        this.protocol = Protocol.valueOf(getProperty(this.properties, "protocol", "tcp").toUpperCase());
+        // force TCP protocol if value not in Enum
+        if (this.protocol == null) {
+            this.protocol = Protocol.TCP;
+        }
+        
         eventAdminTopic = getProperty(this.properties, EventConstants.EVENT_TOPIC, "decanter/collect/socket");
-        this.serverSocket = new ServerSocket(port);
+        
+        switch (protocol) {
+            case TCP:
+                this.serverSocket = new ServerSocket(port);
+                break;
+            case UDP:
+                this.datagramSocket = new DatagramSocket(port);
+                break;
+        }
+        
         // adding 1 for serverSocket handling
         this.executor = Executors.newFixedThreadPool(workers + 1);
         this.executor.execute(this);
@@ -79,9 +104,21 @@ public class SocketCollector implements Closeable, Runnable {
     public void run() {
         while (open) {
             try {
-                Socket socket = serverSocket.accept();
-                LOGGER.debug("Connected to client at {}", socket.getInetAddress());
-                this.executor.execute(new SocketRunnable(socket));
+                switch (protocol) {
+                    case TCP:
+                        Socket socket = serverSocket.accept();
+                        LOGGER.debug("Connected to TCP client at {}", socket.getInetAddress());
+                        this.executor.execute(new SocketRunnable(socket));
+                        break;
+                        
+                    case UDP:
+                        byte[] buffer = new byte[1024];
+                        DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
+                        LOGGER.debug("Connected to UDP client at {}", datagramSocket.getLocalSocketAddress());
+                        datagramSocket.receive(packet);
+                        this.executor.execute(new DatagramRunnable(packet));
+                        break;
+                }
             } catch (IOException e) {
                 LOGGER.warn("Exception receiving log.", e);
             }
@@ -103,7 +140,15 @@ public class SocketCollector implements Closeable, Runnable {
         } catch (Exception e) {
             // nothing to do
         }
-        serverSocket.close();
+        switch (protocol) {
+            case TCP:
+                serverSocket.close();
+                break;
+                
+            case UDP:
+                datagramSocket.close();
+                break;
+        }
     }
 
     @Reference
@@ -148,6 +193,41 @@ public class SocketCollector implements Closeable, Runnable {
             }
         }
     }
+    
+    private class DatagramRunnable implements Runnable {
+
+        private DatagramPacket packet;
+
+        public DatagramRunnable(DatagramPacket packet) {
+            this.packet = packet;
+        }
+
+        public void run() {
+            
+            try (ByteArrayInputStream bais = new ByteArrayInputStream(packet.getData()))
{
+                Map<String, Object> data = new HashMap<>();
+                data.put("hostAddress", InetAddress.getLocalHost().getHostAddress());
+                data.put("hostName", InetAddress.getLocalHost().getHostName());
+                data.put("type", "socket");
+                String karafName = System.getProperty("karaf.name");
+                if (karafName != null) {
+                    data.put("karafName", karafName);
+                }
+                try {
+                    data.putAll(unmarshaller.unmarshal(bais));
+                } catch (Exception e) {
+                    // nothing to do
+                }
+                Event event = new Event(eventAdminTopic, data);
+                dispatcher.postEvent(event);
+                datagramSocket.send(packet);
+            } catch (EOFException e) {
+                LOGGER.warn("Client closed the connection", e);
+            } catch (IOException e) {
+                LOGGER.warn("Exception receiving data", e);
+            }
+        }
+    }
 
     @Reference
     public void setDispatcher(EventAdmin dispatcher) {
diff --git a/manual/src/main/asciidoc/user-guide/collectors.adoc b/manual/src/main/asciidoc/user-guide/collectors.adoc
index c236409..8a8980d 100644
--- a/manual/src/main/asciidoc/user-guide/collectors.adoc
+++ b/manual/src/main/asciidoc/user-guide/collectors.adoc
@@ -469,10 +469,14 @@ This feature installs a default `etc/org.apache.karaf.decanter.collector.socket.
 
 # Number of worker threads to deal with
 #workers=10
+
+# Protocol tcp(default) or udp
+#protocol=tcp
 ----
 
 * the `port` property contains the port number where the network socket collector is listening
 * the `workers` property contains the number of worker thread the socket collector is using
for connection
+* the `protocol` property contains the protocol used by the collector for transferring data
with the client
 
 ==== JMS
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@karaf.apache.org" <commits@karaf.apache.org>'].

Mime
View raw message