karaf-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KARAF-5454) Collector socket - Add UDP protocol support
Date Thu, 16 Nov 2017 14:49:00 GMT

    [ https://issues.apache.org/jira/browse/KARAF-5454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16255439#comment-16255439
] 

ASF GitHub Bot commented on KARAF-5454:
---------------------------------------

jbonofre closed pull request #18: [KARAF-5454] - Collector socket - Add UDP protocol support
URL: https://github.com/apache/karaf-decanter/pull/18
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/collector/socket/src/main/cfg/org.apache.karaf.decanter.collector.socket.cfg
b/collector/socket/src/main/cfg/org.apache.karaf.decanter.collector.socket.cfg
index b9a04b2..ed67ea7 100644
--- a/collector/socket/src/main/cfg/org.apache.karaf.decanter.collector.socket.cfg
+++ b/collector/socket/src/main/cfg/org.apache.karaf.decanter.collector.socket.cfg
@@ -4,4 +4,7 @@
 #port=34343
 
 # Number of worker threads to deal with
-#workers=10
\ No newline at end of file
+#workers=10
+
+# Protocol tcp(default) or udp
+#protocol=tcp
diff --git a/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
b/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
index 49ddb63..3c1e48e 100644
--- a/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
+++ b/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
@@ -17,6 +17,8 @@
 package org.apache.karaf.decanter.collector.socket;
 
 import java.io.*;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
@@ -49,6 +51,8 @@
     private static final Logger LOGGER = LoggerFactory.getLogger(SocketCollector.class);
 
     private ServerSocket serverSocket;
+    private DatagramSocket datagramSocket;
+    private Protocol protocol;
     private EventAdmin eventAdmin;
     private boolean open;
     private ExecutorService executor;
@@ -56,6 +60,11 @@
     private String eventAdminTopic;
     private EventAdmin dispatcher;
     private Unmarshaller unmarshaller;
+    
+    private enum Protocol {
+        TCP,
+        UDP;
+    }
 
     @SuppressWarnings("unchecked")
     @Activate
@@ -63,8 +72,24 @@ public void activate(ComponentContext context) throws IOException {
         this.properties = context.getProperties();
         int port = Integer.parseInt(getProperty(this.properties, "port", "34343"));
         int workers = Integer.parseInt(getProperty(this.properties, "workers", "10"));
+        
+        this.protocol = Protocol.valueOf(getProperty(this.properties, "protocol", "tcp").toUpperCase());
+        // force TCP protocol if value not in Enum
+        if (this.protocol == null) {
+            this.protocol = Protocol.TCP;
+        }
+        
         eventAdminTopic = getProperty(this.properties, EventConstants.EVENT_TOPIC, "decanter/collect/socket");
-        this.serverSocket = new ServerSocket(port);
+        
+        switch (protocol) {
+            case TCP:
+                this.serverSocket = new ServerSocket(port);
+                break;
+            case UDP:
+                this.datagramSocket = new DatagramSocket(port);
+                break;
+        }
+        
         // adding 1 for serverSocket handling
         this.executor = Executors.newFixedThreadPool(workers + 1);
         this.executor.execute(this);
@@ -79,9 +104,21 @@ private String getProperty(Dictionary<String, Object> properties,
String key, St
     public void run() {
         while (open) {
             try {
-                Socket socket = serverSocket.accept();
-                LOGGER.debug("Connected to client at {}", socket.getInetAddress());
-                this.executor.execute(new SocketRunnable(socket));
+                switch (protocol) {
+                    case TCP:
+                        Socket socket = serverSocket.accept();
+                        LOGGER.debug("Connected to TCP client at {}", socket.getInetAddress());
+                        this.executor.execute(new SocketRunnable(socket));
+                        break;
+                        
+                    case UDP:
+                        byte[] buffer = new byte[1024];
+                        DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
+                        LOGGER.debug("Connected to UDP client at {}", datagramSocket.getLocalSocketAddress());
+                        datagramSocket.receive(packet);
+                        this.executor.execute(new DatagramRunnable(packet));
+                        break;
+                }
             } catch (IOException e) {
                 LOGGER.warn("Exception receiving log.", e);
             }
@@ -103,7 +140,15 @@ public void close() throws IOException {
         } catch (Exception e) {
             // nothing to do
         }
-        serverSocket.close();
+        switch (protocol) {
+            case TCP:
+                serverSocket.close();
+                break;
+                
+            case UDP:
+                datagramSocket.close();
+                break;
+        }
     }
 
     @Reference
@@ -148,6 +193,41 @@ public void run() {
             }
         }
     }
+    
+    private class DatagramRunnable implements Runnable {
+
+        private DatagramPacket packet;
+
+        public DatagramRunnable(DatagramPacket packet) {
+            this.packet = packet;
+        }
+
+        public void run() {
+            
+            try (ByteArrayInputStream bais = new ByteArrayInputStream(packet.getData()))
{
+                Map<String, Object> data = new HashMap<>();
+                data.put("hostAddress", InetAddress.getLocalHost().getHostAddress());
+                data.put("hostName", InetAddress.getLocalHost().getHostName());
+                data.put("type", "socket");
+                String karafName = System.getProperty("karaf.name");
+                if (karafName != null) {
+                    data.put("karafName", karafName);
+                }
+                try {
+                    data.putAll(unmarshaller.unmarshal(bais));
+                } catch (Exception e) {
+                    // nothing to do
+                }
+                Event event = new Event(eventAdminTopic, data);
+                dispatcher.postEvent(event);
+                datagramSocket.send(packet);
+            } catch (EOFException e) {
+                LOGGER.warn("Client closed the connection", e);
+            } catch (IOException e) {
+                LOGGER.warn("Exception receiving data", e);
+            }
+        }
+    }
 
     @Reference
     public void setDispatcher(EventAdmin dispatcher) {
diff --git a/manual/src/main/asciidoc/user-guide/collectors.adoc b/manual/src/main/asciidoc/user-guide/collectors.adoc
index c236409..8a8980d 100644
--- a/manual/src/main/asciidoc/user-guide/collectors.adoc
+++ b/manual/src/main/asciidoc/user-guide/collectors.adoc
@@ -469,10 +469,14 @@ This feature installs a default `etc/org.apache.karaf.decanter.collector.socket.
 
 # Number of worker threads to deal with
 #workers=10
+
+# Protocol tcp(default) or udp
+#protocol=tcp
 ----
 
 * the `port` property contains the port number where the network socket collector is listening
 * the `workers` property contains the number of worker thread the socket collector is using
for connection
+* the `protocol` property contains the protocol used by the collector for transferring data
with the client
 
 ==== JMS
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Collector socket - Add UDP protocol support
> -------------------------------------------
>
>                 Key: KARAF-5454
>                 URL: https://issues.apache.org/jira/browse/KARAF-5454
>             Project: Karaf
>          Issue Type: Improvement
>          Components: decanter
>            Reporter: Francois Papon
>            Assignee: Jean-Baptiste Onofré
>            Priority: Minor
>             Fix For: decanter-1.5.0
>
>
> Syslog send data with the UDP protocol and actually, it doesn't work
> with the collector-socket version.
> We could use a DatagramServer instead of a ServerSocket for UDP.
> Modifications :
> - Add a new properties in the collector cfg file to define the protocol :
> # Protocol tcp(default) or udp
> #protocol=tcp
> - Instanciate a DatagramServer in the case of udp protocol is define in
> the cfg (org.apache.karaf.decanter.collector.socket.SocketCollector)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message